Java Socket通訊技術在很長的`時間裡都在使用,在不少的程式設計師眼中都有很多高的評價。那麼下面我們就看看如何才能掌握這門複雜的程式語言,希望大家在今後的Java Socket通訊技術使用中有所收穫。
下面就是Java Socket通訊技術在解決收發執行緒互斥的程式碼介紹。
age ;
rt ception;
rt tStream;
rt utStream;
rt SocketAddress;
rt et;
rt etException;
rt etTimeoutException;
rt leDateFormat;
rt ;
rt erties;
rt r;
rt rTask;
rt urrentHashMap;
rt Unit;
rt ition;
rt trantLock;
rt er;
19./**
20.*
title: socket通訊包裝類
21.*
Description:
22.*
CopyRight: CopyRight (c) 2009
23.*
Company:
24.*
Create date: 2009-10-14
25.*author sunnylocus
26. * v0.10 2009-10-14 初類
27.* v0.11 2009-11-12 對命令收發邏輯及收發執行緒互斥機制進行了優化,
處理命令速度由原來8~16個/秒提高到25~32個/秒
28.*/ public class SocketConnection {
ate volatile Socket socket;
ate int timeout = 1000*10; //超時時間,初始值10秒
ate boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測
ate boolean isNetworkConnect = false; //網路是否已連線
ate static String host = "";
ate static int port;
ic InputStream inStream = null;
ic OutputStream outStream = null;
ate static Logger log =ogger
(s);
ate static SocketConnection socketConnection = null;
ate static r heartTimer=null;
40.//private final Map
synchronizedMap(new HashMap
ate final ConcurrentHashMap
= new ConcurrentHashMap
ate static Thread receiveThread = null;
ate final ReentrantLock lock = new ReentrantLock();
ate SocketConnection(){
erties conf = new Properties();
{
(esourceAsStream
(""));
out = eOf(roperty("timeout"));
(roperty("ip"),eOf
(roperty("port")));
50.} catch(IOException e) {
l("socket初始化異常!",e);
w new RuntimeException("socket初始化異常,請檢查配置引數");
53.}
54.}
55./**
56.* 單態模式
57.*/
ic static SocketConnection getInstance() {
(socketConnection==null) {
hronized(s) {
(socketConnection==null) {
etConnection = new SocketConnection();
rn socketConnection;
64.}
65.}
66.}
rn socketConnection;
68.}
ate void init(String host,int port) throws IOException {
SocketAddress addr = new InetSocketAddress(host,port);
et = new Socket();
hronized (this) {
("【準備與"+addr+"建立連線】");
ect(addr, timeout);
("【與"+addr+"連線已建立】");
ream = nputStream();
tream = utputStream();
cpNoDelay(true);//資料不作緩衝,立即傳送
oLinger(true, 0);//socket關閉時,立即釋放資源
eepAlive(true);
rafficClass(0x04|0x10);//高可靠性和最小延遲傳輸
tworkConnect=true;
iveThread = new Thread(new ReceiveWorker());
t();
=host;
=port;
(!isLaunchHeartcheck)
chHeartcheck();
89.}
90.}
91./**
92.* 心跳包檢測
93.*/
ate void launchHeartcheck() {
(socket == null)
w new IllegalStateException("socket is not
established!");
tTimer = new Timer();
unchHeartcheck = true;
dule(new TimerTask() {
ic void run() {
ng msgStreamNo = treamNo("kq");
mstType =9999;//999-心跳包請求
leDateFormat dateformate = new SimpleDateFormat
("yyyyMMddHHmmss");
ng msgDateTime = at(new Date());
msgLength =38;//訊息頭長度
ng commandstr = "00" +msgLength + mstType + msgStreamNo;
("心跳檢測包 -> IVR "+commandstr);
reconnCounter = 1;
e(true) {
ng responseMsg =null;
{
onseMsg = readReqMsg(commandstr);
113.} catch (IOException e) {
r("IO流異常",e);
nnCounter ++;
116.}
(responseMsg!=null) {
("心跳響應包 <- IVR "+responseMsg);
nnCounter = 1;
k;
121.} else {
nnCounter ++;
123.}
(reconnCounter >3) {//重連次數已達三次,判定網路連線中斷,
重新建立連線。連線未被建立時不釋放鎖
nnectToCTCC(); break;
126.}
127.}
128.}
129.},1000 * 60*1,1000*60*2);
130.}
131./**
132.* 重連與目標IP建立重連
133.*/
ate void reConnectToCTCC() {
Thread(new Runnable(){
ic void run(){
("重新建立與"+host+":"+port+"的連線");
138.//清理工作,中斷計時器,中斷接收執行緒,恢復初始變數
el();
unchHeartcheck=false;
tworkConnect = false;
rrupt();
{
e();
145.} catch (IOException e1) {r("重連時,關閉socket連
接發生IO流異常",e1);}
146.//----------------
hronized(this){
(; ;){
{
entThread();
p(1000 * 1);
(host,port);
fyAll();
k ;
155.} catch (IOException e) {
r("重新建立連線未成功",e);
157.} catch (InterruptedException e){
r("重連執行緒中斷",e);
159.}
160.}
161.}
162.}
163.})t();
164.}
165./**
166.* 傳送命令並接受響應
167.* @param requestMsg
168.* @return
169.* @throws SocketTimeoutException
170.* @throws IOException
171.*/
ic String readReqMsg(String requestMsg) throws IOException {
(requestMsg ==null) {
rn null;
175.}
(!isNetworkConnect) {
hronized(this){
{
(1000*5); //等待5秒,如果網路還沒有恢復,丟擲IO流異常
(!isNetworkConnect) {
w new IOException("網路連線中斷!");
182.}
183.} catch (InterruptedException e) {
r("傳送執行緒中斷",e);
185.}
186.}
187.}
ng msgNo = tring(8, 8 + 24);//讀取流水號
tream = utputStream();
e(ytes());
h();
ition msglock = ondition(); //訊息鎖
193.//註冊等待接收訊息
(msgNo, msglock);
{
();
t(timeout,ISECONDS);
198.} catch (InterruptedException e) {
r("傳送執行緒中斷",e);
200.} finally {
ck();
202.}
ct respMsg = ve(msgNo); //響應資訊
(respMsg!=null &&(respMsg != msglock)) {
205.//已經接收到訊息,登出等待,成功返回訊息
rn (String) respMsg;
207.} else {
r(msgNo+" 超時,未收到響應訊息");
w new SocketTimeoutException(msgNo+" 超時,未收到響應訊息");
210.}
211.}
ic void finalize() {
(socket != null) {
{
e();
216.} catch (IOException e) {
tStackTrace();
218.}
219.}
220.}
221.//訊息接收執行緒
ate class ReceiveWorker implements Runnable {
ng intStr= null;
ic void run() {
e(!rrupted()){
{
[] headBytes = new byte[4];
((headBytes)==-1){
("讀到流未尾,對方已關閉流!");
nnectToCTCC();//讀到流未尾,對方已關閉流
rn;
232.}
[] tmp =new byte[4];
= headBytes;
ng tempStr = new String(tmp)();
(tempStr==null || ls("")) {
r("received message is null");
inue;
239.}
tr = new String(tmp);
totalLength =eInt(intStr);
242.//----------------
[] msgBytes = new byte[totalLength-4];
(msgBytes);
ng resultMsg = new String(headBytes)+ new
String(msgBytes);
246.//抽出訊息ID
ng msgNo = tring(8, 8 + 24);
ition msglock =(Condition) (msgNo);
(msglock ==null) {
(msgNo+"序號可能已被登出!響應訊息丟棄");
ve(msgNo);
inue;
253.}
(msgNo, resultMsg);
{
();
alAll();
258.}finally {
ck();
260.}
261.}catch(SocketException e){
r("服務端關閉socket",e);
nnectToCTCC();
264.} catch(IOException e) {
r("接收執行緒讀取響應資料時發生IO流異常",e);
266.} catch(NumberFormatException e){
r("收到沒良心包,String轉int異常,異常字元:"+intStr);
268.}
269.}
270.}
271.}
272.}