Author: fhanik Date: Mon Feb 27 10:22:16 2006 New Revision: 381403 URL: http://svn.apache.org/viewcvs?rev=381403&view=rev Log: Initial cleanup, getting ready to create a NIO data sender for faster throughput.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381403&r1=381402&r2=381403&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Feb 27 10:22:16 2006 @@ -34,25 +34,23 @@ * @author Peter Rossbach * @author Filip Hanik * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $ - * @since 5.5.7 + * @since 5.5.16 */ public class DataSender implements IDataSender { - private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory - .getLog(DataSender.class); + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(DataSender.class); /** * The string manager for this package. */ - protected static StringManager sm = StringManager - .getManager(Constants.Package); + protected static StringManager sm = StringManager.getManager(Constants.Package); // ----------------------------------------------------- Instance Variables /** * The descriptive information about this implementation. */ - private static final String info = "DataSender/2.1"; + private static final String info = "DataSender/3.0"; /** * receiver address @@ -678,20 +676,17 @@ return ; try { createSocket(); - if (isWaitForAck()) - socket.setSoTimeout((int) ackTimeout); + if (isWaitForAck()) socket.setSoTimeout((int) ackTimeout); isSocketConnected = true; socketOpenCounter++; this.keepAliveCount = 0; this.keepAliveConnectTime = System.currentTimeMillis(); if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.openSocket", address - .getHostAddress(), new Integer(port),new Long(socketOpenCounter))); + log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port),new Long(socketOpenCounter))); } catch (IOException ex1) { socketOpenFailureCounter++ ; if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.openSocket.failure", - address.getHostAddress(), new Integer(port),new Long(socketOpenFailureCounter)), ex1); + log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(socketOpenFailureCounter)), ex1); throw ex1; } @@ -725,8 +720,7 @@ isSocketConnected = false; socketCloseCounter++; if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.closeSocket", - address.getHostAddress(), new Integer(port),new Long(socketCloseCounter))); + log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new Integer(port),new Long(socketCloseCounter))); } } @@ -791,62 +785,48 @@ * @throws java.io.IOException * @since 5.5.10 */ - protected void pushMessage( ChannelMessage data) - throws java.io.IOException { - long time = 0 ; - if(doProcessingStats) { - time = System.currentTimeMillis(); - } - boolean messageTransfered = false ; + + protected void pushMessage(ChannelMessage data, boolean reconnect) throws java.io.IOException { synchronized(this) { checkKeepAlive(); - if (!isConnected()) - openSocket(); - else if(keepAliveTimeout > -1) - this.keepAliveConnectTime = System.currentTimeMillis(); + if ( reconnect ) closeSocket(); + if (!isConnected()) openSocket(); + else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis(); } + writeData(data); + + } + + protected void pushMessage( ChannelMessage data) throws java.io.IOException { + long time = 0 ; + if(doProcessingStats) time = System.currentTimeMillis(); + boolean messageTransfered = false ; IOException exception = null; try { - writeData(data); + // first try with existing connection + pushMessage(data,false); messageTransfered = true ; } catch (java.io.IOException x) { exception = x; - if( true ) { //allow resend + //resend + dataResendCounter++; + if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new Integer(port)),x); + try { // second try with fresh connection - dataResendCounter++; - if (log.isTraceEnabled()) - log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(), - new Integer(port)),x); - synchronized(this) { - closeSocket(); - openSocket(); - } - try { - writeData(data); - messageTransfered = true; - exception = null; - } catch (IOException xx) { - xx.fillInStackTrace(); - exception = xx; - } - } else - { - synchronized(this) { - closeSocket(); - } + pushMessage(data,true); + messageTransfered = true; + exception = null; + } catch (IOException xx) { + exception = xx; + closeSocket(); } } finally { this.keepAliveCount++; checkKeepAlive(); - if(doProcessingStats) { - addProcessingStats(time); - } + if(doProcessingStats) addProcessingStats(time); if(messageTransfered) { addStats(data.getMessage().length); - if (log.isTraceEnabled()) { - log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(), - new Integer(port), data.getUniqueId(), new Long(data.getMessage().length))); - } + if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new Integer(port), data.getUniqueId(), new Long(data.getMessage().length))); } else { dataFailureCounter++; if ( exception != null ) throw exception; @@ -869,8 +849,7 @@ OutputStream out = socket.getOutputStream(); out.write(XByteBuffer.createDataPackage((ClusterData)data)); out.flush(); - if (isWaitForAck()) - waitForAck(ackTimeout); + if (isWaitForAck()) waitForAck(ackTimeout); } finally { synchronized(this) { isMessageTransferStarted = false ; @@ -892,31 +871,20 @@ } try { int bytesRead = 0; - if ( log.isTraceEnabled() ) - log.trace(sm.getString("IDataSender.ack.start",getAddress(), new Integer(socket.getLocalPort()))); + if ( log.isTraceEnabled() ) log.trace(sm.getString("IDataSender.ack.start",getAddress(), new Integer(socket.getLocalPort()))); int i = socket.getInputStream().read(); while ((i != -1) && (i != 3) && bytesRead < 10) { - if ( log.isTraceEnabled() ) - log.trace(sm.getString("IDataSender.ack.read",getAddress(), new Integer(socket.getLocalPort()),new Character((char) i))); + if ( log.isTraceEnabled() ) log.trace(sm.getString("IDataSender.ack.read",getAddress(), new Integer(socket.getLocalPort()),new Character((char) i))); bytesRead++; i = socket.getInputStream().read(); } if (i != 3) { - if (i == -1) { - throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort()))); - } else { - throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort()))); - } - } else { - if (log.isTraceEnabled()) { - log.trace(sm.getString("IDataSender.ack.receive", getAddress(),new Integer(socket.getLocalPort()))); - } - } + if (i == -1) throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort()))); + else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort()))); + } else if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.ack.receive", getAddress(),new Integer(socket.getLocalPort()))); } catch (IOException x) { missingAckCounter++; - String errmsg = sm.getString("IDataSender.ack.missing", getAddress(), - new Integer(socket.getLocalPort()), - new Long(this.ackTimeout)); + String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.ackTimeout)); if ( !this.isSuspect() ) { this.setSuspect(true); if ( log.isWarnEnabled() ) log.warn(errmsg, x); @@ -925,9 +893,7 @@ } throw x; } finally { - if(doWaitAckStats) { - addWaitAckStats(time); - } + if(doWaitAckStats) addWaitAckStats(time); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]