pero 2005/07/08 13:51:48
Modified: modules/cluster/src/share/org/apache/catalina/cluster/io
ListenCallback.java SocketObjectReader.java
modules/cluster/src/share/org/apache/catalina/cluster/tcp
ClusterReceiverBase.java DataSender.java
LocalStrings.properties PooledSocketSender.java
ReplicationListener.java
ReplicationTransmitter.java SendMessageData.java
SimpleTcpCluster.java
SocketReplicationListener.java
SocketReplicationThread.java
TcpReplicationThread.java mbeans-descriptors.xml
Log:
send ack before message is handled
default is now that no wait ack is default.
Revision Changes Path
1.4 +11 -4
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ListenCallback.java
Index: ListenCallback.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ListenCallback.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ListenCallback.java 26 Jun 2005 21:21:49 -0000 1.3
+++ ListenCallback.java 8 Jul 2005 20:50:30 -0000 1.4
@@ -26,8 +26,6 @@
* @author Peter Rossbach
* @version $Revision$, $Date$
*/
-
-
public interface ListenCallback
{
/**
@@ -35,6 +33,15 @@
* been received from one of the cluster nodes.
* @param data - the message bytes received from the cluster/replication
system
*/
- // public void messageDataReceived(byte[] data);
public void messageDataReceived(ClusterData data);
+
+ /** receiver must be send ack
+ */
+ public boolean isSendAck() ;
+
+ /** send ack
+ *
+ */
+ public void sendAck() throws java.io.IOException ;
+
}
\ No newline at end of file
1.4 +3 -1
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java
Index: SocketObjectReader.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SocketObjectReader.java 26 Jun 2005 21:21:49 -0000 1.3
+++ SocketObjectReader.java 8 Jul 2005 20:50:30 -0000 1.4
@@ -74,6 +74,8 @@
int pkgCnt = 0;
while ( pkgExists ) {
ClusterData cdata = buffer.extractPackage(true);
+ if(callback.isSendAck())
+ callback.sendAck() ;
callback.messageDataReceived(cdata);
pkgCnt++;
pkgExists = buffer.doesPackageExist();
1.9 +9 -1
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
Index: ClusterReceiverBase.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ClusterReceiverBase.java 1 Jul 2005 18:35:54 -0000 1.8
+++ ClusterReceiverBase.java 8 Jul 2005 20:50:30 -0000 1.9
@@ -505,4 +505,12 @@
}
}
}
+
+ /* (non-Javadoc)
+ * @see org.apache.catalina.cluster.io.ListenCallback#sendAck()
+ */
+ public void sendAck() throws IOException {
+ // do nothing
+ }
+
}
1.14 +36 -3
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
Index: DataSender.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- DataSender.java 26 Jun 2005 21:21:50 -0000 1.13
+++ DataSender.java 8 Jul 2005 20:50:30 -0000 1.14
@@ -120,6 +120,11 @@
protected long dataResendCounter = 0;
/**
+ * number of data failure sends
+ */
+ protected long dataFailureCounter = 0;
+
+ /**
* doProcessingStats
*/
protected boolean doProcessingStats = false;
@@ -182,7 +187,7 @@
/**
* wait for receiver Ack
*/
- private boolean waitForAck = true;
+ private boolean waitForAck = false;
/**
* number of socket close
@@ -199,6 +204,12 @@
*/
private int socketOpenFailureCounter = 0 ;
+ /**
+ * After failure make a resend
+ */
+ private boolean resend = false ;
+
+
// -------------------------------------------------------------
Constructor
public DataSender(String domain,InetAddress host, int port) {
@@ -378,6 +389,13 @@
return dataResendCounter;
}
+ /**
+ * @return Returns the dataFailureCounter.
+ */
+ public long getDataFailureCounter() {
+ return dataFailureCounter;
+ }
+
public InetAddress getAddress() {
return address;
}
@@ -478,6 +496,18 @@
}
/**
+ * @return Returns the resend.
+ */
+ public boolean isResend() {
+ return resend;
+ }
+ /**
+ * @param resend The resend to set.
+ */
+ public void setResend(boolean resend) {
+ this.resend = resend;
+ }
+ /**
* @return Returns the socket.
*/
public Socket getSocket() {
@@ -564,6 +594,7 @@
connectCounter = isConnected() ? 1 : 0;
missingAckCounter = 0;
dataResendCounter = 0;
+ dataFailureCounter = 0 ;
socketOpenCounter =isConnected() ? 1 : 0;
socketOpenFailureCounter = 0 ;
socketCloseCounter = 0;
@@ -727,7 +758,8 @@
writeData(data);
messageTransfered = true ;
} catch (java.io.IOException x) {
- if(data.getResend() != ClusterMessage.FLAG_FORBIDDEN) {
+ if(data.getResend() == ClusterMessage.FLAG_ALLOWED ||
+ (data.getResend() == ClusterMessage.FLAG_DEFAULT &&
isResend() )) {
// second try with fresh connection
dataResendCounter++;
if (log.isTraceEnabled())
@@ -761,6 +793,7 @@
new Integer(port), data.getUniqueId(), new
Long(data.getMessage().length)));
}
} else {
+ dataFailureCounter++;
if (log.isWarnEnabled())
log.warn(sm.getString("IDataSender.send.lost",
address.getHostAddress(),
new Integer(port), data.getType(),
data.getUniqueId()),exception);
1.14 +2 -0
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
Index: LocalStrings.properties
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- LocalStrings.properties 1 Jul 2005 18:35:54 -0000 1.13
+++ LocalStrings.properties 8 Jul 2005 20:50:30 -0000 1.14
@@ -27,6 +27,8 @@
PoolSocketSender.noMoreSender=No socket sender available for client
[{0}:{1,number,integer}] did it disappear?
ReplicationTransmitter.getProperty=get property {0}
ReplicationTransmitter.setProperty=set property {0}: {1} old value {2}
+ReplicationTransmitter.started=Start ClusterSender at cluster {0} with name
{1}
+ReplicationTransmitter.stopped=Stopped ClusterSender at cluster {0} with
name {1}
ReplicationValve.filter.loading=Loading request filters={0}
ReplicationValve.filter.token=Request filter={0}
ReplicationValve.filter.token.failure=Unable to compile filter={0}
1.16 +1 -0
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
Index: PooledSocketSender.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- PooledSocketSender.java 26 Jun 2005 21:21:50 -0000 1.15
+++ PooledSocketSender.java 8 Jul 2005 20:50:30 -0000 1.16
@@ -234,6 +234,7 @@
sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
sender.setAckTimeout(parent.getAckTimeout());
sender.setWaitForAck(parent.isWaitForAck());
+ sender.setResend(parent.isResend());
return sender;
}
1.23 +2 -1
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Index: ReplicationListener.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -r1.22 -r1.23
--- ReplicationListener.java 15 Apr 2005 20:14:14 -0000 1.22
+++ ReplicationListener.java 8 Jul 2005 20:50:30 -0000 1.23
@@ -244,4 +244,5 @@
}
}
+
}
1.36 +10 -3
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Index: ReplicationTransmitter.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -r1.35 -r1.36
--- ReplicationTransmitter.java 30 Jun 2005 13:03:34 -0000 1.35
+++ ReplicationTransmitter.java 8 Jul 2005 20:50:30 -0000 1.36
@@ -35,7 +35,6 @@
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.util.StringManager;
import org.apache.tomcat.util.IntrospectionUtils;
-import org.apache.tomcat.util.digester.SetTopRule;
/**
* Transmit message to ohter cluster members create sender from
replicationMode
@@ -517,6 +516,7 @@
public void start() throws java.io.IOException {
if (cluster != null) {
ObjectName clusterName = cluster.getObjectName();
+ ObjectName transmitterName = null ;
try {
MBeanServer mserver = cluster.getMBeanServer();
Container container = cluster.getContainer();
@@ -524,7 +524,7 @@
if (container instanceof StandardHost) {
name += ",host=" + clusterName.getKeyProperty("host");
}
- ObjectName transmitterName = new ObjectName(name);
+ transmitterName = new ObjectName(name);
if (mserver.isRegistered(transmitterName)) {
if (log.isWarnEnabled())
log.warn(sm.getString(
@@ -535,6 +535,10 @@
setObjectName(transmitterName);
mserver.registerMBean(cluster.getManagedBean(this),
getObjectName());
+ if(log.isInfoEnabled())
+ log.info(sm.getString("ReplicationTransmitter.started",
+ clusterName, transmitterName));
+
} catch (Exception e) {
log.warn(e);
}
@@ -565,6 +569,9 @@
} catch (Exception e) {
log.error(e);
}
+ if(log.isInfoEnabled())
+ log.info(sm.getString("ReplicationTransmitter.stopped",
+ cluster.getObjectName(), getObjectName()));
}
}
1.3 +1 -2
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java
Index: SendMessageData.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SendMessageData.java 10 Apr 2005 16:20:46 -0000 1.2
+++ SendMessageData.java 8 Jul 2005 20:50:30 -0000 1.3
@@ -16,7 +16,6 @@
package org.apache.catalina.cluster.tcp;
-import org.apache.catalina.cluster.ClusterMessage;
import org.apache.catalina.cluster.Member;
/**
1.70 +7 -7
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
Index: SimpleTcpCluster.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
retrieving revision 1.69
retrieving revision 1.70
diff -u -r1.69 -r1.70
--- SimpleTcpCluster.java 1 Jul 2005 16:51:14 -0000 1.69
+++ SimpleTcpCluster.java 8 Jul 2005 20:50:30 -0000 1.70
@@ -181,7 +181,7 @@
private org.apache.catalina.cluster.ClusterDeployer clusterDeployer;
- private boolean defaultMode = false ;
+ private boolean defaultMode = true ;
/**
* Listeners of messages
@@ -701,7 +701,7 @@
clusterReceiver.setCompress(clusterSender.isCompress());
clusterReceiver.setCatalinaCluster(this);
clusterReceiver.start();
- } else
+ }
// start the sender.
if(clusterSender != null && clusterReceiver != null) {
@@ -746,8 +746,8 @@
* className="org.apache.catalina.cluster.mcast.McastService"
* mcastAddr="228.0.0.4"
* mcastPort="8012"
- * mcastFrequency="500"
- * mcastDropTime="3000"/>
+ * mcastFrequency="1000"
+ * mcastDropTime="30000"/>
* </pre>
*/
protected void createDefaultMembershipService() {
@@ -760,8 +760,8 @@
McastService mService= new McastService();
mService.setMcastAddr("228.0.0.4");
mService.setMcastPort(8012);
- mService.setMcastFrequency(500);
- mService.setMcastDropTime(3000);
+ mService.setMcastFrequency(1000);
+ mService.setMcastDropTime(30000);
transferProperty("service",mService);
setMembershipService(mService);
}
1.6 +5 -7
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationListener.java
Index: SocketReplicationListener.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationListener.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SocketReplicationListener.java 1 Jul 2005 18:35:54 -0000 1.5
+++ SocketReplicationListener.java 8 Jul 2005 20:50:30 -0000 1.6
@@ -21,7 +21,6 @@
import java.net.ServerSocket;
import java.net.Socket;
-import org.apache.catalina.cluster.io.SocketObjectReader;
import org.apache.catalina.util.StringManager;
/**
@@ -32,7 +31,7 @@
// ---------------------------------------------------- Statics
- public static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory
+ private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory
.getLog(SocketReplicationListener.class);
/**
@@ -151,8 +150,7 @@
Socket socket = serverSocket.accept();
if (doListen) {
SocketReplicationThread t = new
SocketReplicationThread(
- this, socket, new
SocketObjectReader(socket,
- this), isSendAck());
+ this, socket);
t.setDaemon(true);
t.start();
}
@@ -254,5 +252,5 @@
unLockSocket();
doListen = false;
}
-
-}
+
+ }
1.2 +18 -10
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationThread.java
Index: SocketReplicationThread.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationThread.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SocketReplicationThread.java 10 Apr 2005 16:20:46 -0000 1.1
+++ SocketReplicationThread.java 8 Jul 2005 20:50:30 -0000 1.2
@@ -19,6 +19,7 @@
import java.io.InputStream;
import java.net.Socket;
+import org.apache.catalina.cluster.io.ListenCallback;
import org.apache.catalina.cluster.io.SocketObjectReader;
/**
@@ -27,7 +28,7 @@
* FIXME Socket timeout
* @version $Revision$, $Date$
*/
-public class SocketReplicationThread extends Thread {
+public class SocketReplicationThread extends Thread implements
ListenCallback {
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory
.getLog(SocketReplicationThread.class);
@@ -43,8 +44,6 @@
private boolean keepRunning = true;
- private boolean sendAck;
-
/**
* Fork Listen Worker Thread!
*
@@ -52,13 +51,12 @@
* @param reader
* @param sendAck
*/
- SocketReplicationThread(SocketReplicationListener master, Socket socket,
- SocketObjectReader reader, boolean sendAck) {
+ SocketReplicationThread(SocketReplicationListener master, Socket socket
+ ) {
super("ClusterListenThread-" + count++);
this.master = master;
this.socket = socket;
- this.reader = reader;
- this.sendAck = sendAck;
+ this.reader = new SocketObjectReader(socket,this);
}
/**
@@ -83,6 +81,7 @@
if (log.isTraceEnabled()) {
log.trace("sending " + ack + " ack packages to " +
socket.getLocalPort() );
}
+ /**
if (sendAck) {
// ack only when message is complete receive
while (ack > 0) {
@@ -90,6 +89,7 @@
ack--;
}
}
+ **/
keepRunning = master.isDoListen();
} else
// EOF
@@ -110,13 +110,21 @@
socket = null;
}
}
-
+
+ public void messageDataReceived(ClusterData data) {
+ master.messageDataReceived(data);
+ }
+
+ public boolean isSendAck() {
+ return master.isSendAck();
+ }
+
/**
* send a reply-acknowledgement
*
* @throws java.io.IOException
*/
- private void sendAck() throws java.io.IOException {
+ public void sendAck() throws java.io.IOException {
socket.getOutputStream().write(ACK_COMMAND);
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + socket.getPort());
1.18 +2 -1
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
Index: TcpReplicationThread.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- TcpReplicationThread.java 12 Apr 2005 18:56:07 -0000 1.17
+++ TcpReplicationThread.java 8 Jul 2005 20:50:30 -0000 1.18
@@ -132,6 +132,7 @@
if (log.isTraceEnabled()) {
log.trace("sending " + pkgcnt + " ack packages to " +
channel.socket().getLocalPort() );
}
+
if (sendAck) {
while ( pkgcnt > 0 ) {
sendAck(key,channel);
1.15 +28 -0
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
Index: mbeans-descriptors.xml
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- mbeans-descriptors.xml 1 Jul 2005 18:35:54 -0000 1.14
+++ mbeans-descriptors.xml 8 Jul 2005 20:50:30 -0000 1.15
@@ -408,6 +408,10 @@
description="Connect time for keep alive"
type="long"
writeable="false"/>
+ <attribute name="resend"
+ description="after send failure make a resend"
+ is="true"
+ type="boolean" />
<attribute name="connected"
is="true"
description="socket connected"
@@ -489,6 +493,10 @@
description="counts data resends"
type="long"
writeable="false"/>
+ <attribute name="dataFailureCounter"
+ description="counts data send failures"
+ type="long"
+ writeable="false"/>
<attribute name="inQueueCounter"
description="counts all queued messages"
type="long"
@@ -598,6 +606,10 @@
description="Connect time for keep alive"
type="long"
writeable="false"/>
+ <attribute name="resend"
+ description="after send failure make a resend"
+ is="true"
+ type="boolean" />
<attribute name="connected"
is="true"
description="socket connected"
@@ -679,6 +691,10 @@
description="counts data resends"
type="long"
writeable="false"/>
+ <attribute name="dataFailureCounter"
+ description="counts data send failures"
+ type="long"
+ writeable="false"/>
<attribute name="inQueueCounter"
description="counts all queued messages"
type="long"
@@ -751,6 +767,10 @@
<attribute name="keepAliveMaxRequestCount"
description="max request over this socket"
type="int"/>
+ <attribute name="resend"
+ description="after send failure make a resend"
+ is="true"
+ type="boolean" />
<attribute name="connected"
is="true"
description="socket connected"
@@ -832,6 +852,10 @@
description="Connect time for keep alive"
type="long"
writeable="false"/>
+ <attribute name="resend"
+ description="after send failure make a resend"
+ is="true"
+ type="boolean" />
<attribute name="connected"
is="true"
description="socket connected"
@@ -917,6 +941,10 @@
description="counts data resends"
type="long"
writeable="false"/>
+ <attribute name="dataFailureCounter"
+ description="counts data send failures"
+ type="long"
+ writeable="false"/>
<operation name="connect"
description="connect to other replication node"
impact="ACTION"
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]