當前位置:才華齋>IT認證>SUN認證>

如何解決Java Socket通訊技術收發執行緒互斥

SUN認證 閱讀(7.52K)

Java Socket通訊技術在很長的`時間裡都在使用,在不少的程式設計師眼中都有很多高的評價。那麼下面我們就看看如何才能掌握這門複雜的程式語言,希望大家在今後的Java Socket通訊技術使用中有所收穫。

如何解決Java Socket通訊技術收發執行緒互斥

下面就是Java Socket通訊技術在解決收發執行緒互斥的程式碼介紹。

age ;

rt ception;

rt tStream;

rt utStream;

rt SocketAddress;

rt et;

rt etException;

rt etTimeoutException;

rt leDateFormat;

rt ;

rt erties;

rt r;

rt rTask;

rt urrentHashMap;

rt Unit;

rt ition;

rt trantLock;

rt er;

19./**

20.*

title: socket通訊包裝類

21.*

Description:

22.*

CopyRight: CopyRight (c) 2009

23.*

Company:

24.*

Create date: 2009-10-14

25.*author sunnylocus

26. * v0.10 2009-10-14 初類

27.* v0.11 2009-11-12 對命令收發邏輯及收發執行緒互斥機制進行了優化,

處理命令速度由原來8~16個/秒提高到25~32個/秒

28.*/ public class SocketConnection {

ate volatile Socket socket;

ate int timeout = 1000*10; //超時時間,初始值10秒

ate boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測

ate boolean isNetworkConnect = false; //網路是否已連線

ate static String host = "";

ate static int port;

ic InputStream inStream = null;

ic OutputStream outStream = null;

ate static Logger log =ogger

(s);

ate static SocketConnection socketConnection = null;

ate static r heartTimer=null;

40.//private final Map recMsgMap= Collections.

synchronizedMap(new HashMap());

ate final ConcurrentHashMap recMsgMap

= new ConcurrentHashMap();

ate static Thread receiveThread = null;

ate final ReentrantLock lock = new ReentrantLock();

ate SocketConnection(){

erties conf = new Properties();

{

(esourceAsStream

(""));

out = eOf(roperty("timeout"));

(roperty("ip"),eOf

(roperty("port")));

50.} catch(IOException e) {

l("socket初始化異常!",e);

w new RuntimeException("socket初始化異常,請檢查配置引數");

53.}

54.}

55./**

56.* 單態模式

57.*/

ic static SocketConnection getInstance() {

(socketConnection==null) {

hronized(s) {

(socketConnection==null) {

etConnection = new SocketConnection();

rn socketConnection;

64.}

65.}

66.}

rn socketConnection;

68.}

ate void init(String host,int port) throws IOException {

SocketAddress addr = new InetSocketAddress(host,port);

et = new Socket();

hronized (this) {

("【準備與"+addr+"建立連線】");

ect(addr, timeout);

("【與"+addr+"連線已建立】");

ream = nputStream();

tream = utputStream();

cpNoDelay(true);//資料不作緩衝,立即傳送

oLinger(true, 0);//socket關閉時,立即釋放資源

eepAlive(true);

rafficClass(0x04|0x10);//高可靠性和最小延遲傳輸

tworkConnect=true;

iveThread = new Thread(new ReceiveWorker());

t();

=host;

=port;

(!isLaunchHeartcheck)

chHeartcheck();

89.}

90.}

91./**

92.* 心跳包檢測

93.*/

ate void launchHeartcheck() {

(socket == null)

w new IllegalStateException("socket is not

established!");

tTimer = new Timer();

unchHeartcheck = true;

dule(new TimerTask() {

ic void run() {

ng msgStreamNo = treamNo("kq");

mstType =9999;//999-心跳包請求

leDateFormat dateformate = new SimpleDateFormat

("yyyyMMddHHmmss");

ng msgDateTime = at(new Date());

msgLength =38;//訊息頭長度

ng commandstr = "00" +msgLength + mstType + msgStreamNo;

("心跳檢測包 -> IVR "+commandstr);

reconnCounter = 1;

e(true) {

ng responseMsg =null;

{

onseMsg = readReqMsg(commandstr);

113.} catch (IOException e) {

r("IO流異常",e);

nnCounter ++;

116.}

(responseMsg!=null) {

("心跳響應包 <- IVR "+responseMsg);

nnCounter = 1;

k;

121.} else {

nnCounter ++;

123.}

(reconnCounter >3) {//重連次數已達三次,判定網路連線中斷,

重新建立連線。連線未被建立時不釋放鎖

nnectToCTCC(); break;

126.}

127.}

128.}

129.},1000 * 60*1,1000*60*2);

130.}

131./**

132.* 重連與目標IP建立重連

133.*/

ate void reConnectToCTCC() {

Thread(new Runnable(){

ic void run(){

("重新建立與"+host+":"+port+"的連線");

138.//清理工作,中斷計時器,中斷接收執行緒,恢復初始變數

el();

unchHeartcheck=false;

tworkConnect = false;

rrupt();

{

e();

145.} catch (IOException e1) {r("重連時,關閉socket連

接發生IO流異常",e1);}

146.//----------------

hronized(this){

(; ;){

{

entThread();

p(1000 * 1);

(host,port);

fyAll();

k ;

155.} catch (IOException e) {

r("重新建立連線未成功",e);

157.} catch (InterruptedException e){

r("重連執行緒中斷",e);

159.}

160.}

161.}

162.}

163.})t();

164.}

165./**

166.* 傳送命令並接受響應

167.* @param requestMsg

168.* @return

169.* @throws SocketTimeoutException

170.* @throws IOException

171.*/

ic String readReqMsg(String requestMsg) throws IOException {

(requestMsg ==null) {

rn null;

175.}

(!isNetworkConnect) {

hronized(this){

{

(1000*5); //等待5秒,如果網路還沒有恢復,丟擲IO流異常

(!isNetworkConnect) {

w new IOException("網路連線中斷!");

182.}

183.} catch (InterruptedException e) {

r("傳送執行緒中斷",e);

185.}

186.}

187.}

ng msgNo = tring(8, 8 + 24);//讀取流水號

tream = utputStream();

e(ytes());

h();

ition msglock = ondition(); //訊息鎖

193.//註冊等待接收訊息

(msgNo, msglock);

{

();

t(timeout,ISECONDS);

198.} catch (InterruptedException e) {

r("傳送執行緒中斷",e);

200.} finally {

ck();

202.}

ct respMsg = ve(msgNo); //響應資訊

(respMsg!=null &&(respMsg != msglock)) {

205.//已經接收到訊息,登出等待,成功返回訊息

rn (String) respMsg;

207.} else {

r(msgNo+" 超時,未收到響應訊息");

w new SocketTimeoutException(msgNo+" 超時,未收到響應訊息");

210.}

211.}

ic void finalize() {

(socket != null) {

{

e();

216.} catch (IOException e) {

tStackTrace();

218.}

219.}

220.}

221.//訊息接收執行緒

ate class ReceiveWorker implements Runnable {

ng intStr= null;

ic void run() {

e(!rrupted()){

{

[] headBytes = new byte[4];

((headBytes)==-1){

("讀到流未尾,對方已關閉流!");

nnectToCTCC();//讀到流未尾,對方已關閉流

rn;

232.}

[] tmp =new byte[4];

= headBytes;

ng tempStr = new String(tmp)();

(tempStr==null || ls("")) {

r("received message is null");

inue;

239.}

tr = new String(tmp);

totalLength =eInt(intStr);

242.//----------------

[] msgBytes = new byte[totalLength-4];

(msgBytes);

ng resultMsg = new String(headBytes)+ new

String(msgBytes);

246.//抽出訊息ID

ng msgNo = tring(8, 8 + 24);

ition msglock =(Condition) (msgNo);

(msglock ==null) {

(msgNo+"序號可能已被登出!響應訊息丟棄");

ve(msgNo);

inue;

253.}

(msgNo, resultMsg);

{

();

alAll();

258.}finally {

ck();

260.}

261.}catch(SocketException e){

r("服務端關閉socket",e);

nnectToCTCC();

264.} catch(IOException e) {

r("接收執行緒讀取響應資料時發生IO流異常",e);

266.} catch(NumberFormatException e){

r("收到沒良心包,String轉int異常,異常字元:"+intStr);

268.}

269.}

270.}

271.}

272.}