Author: pero
Date: Sun Aug 19 14:04:15 2007
New Revision: 567470
URL: http://svn.apache.org/viewvc?rev=567470&view=rev
Log:
Recovery membership heartbeat after interface down (Fix Bug 40042).
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
tomcat/container/tc5.5.x/webapps/docs/changelog.xml
tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
Sun Aug 19 14:04:15 2007
@@ -59,7 +59,7 @@
/**
* The descriptive information about this implementation.
*/
- private static final String info = "McastService/2.1";
+ private static final String info = "McastService/2.2";
/**
* The implementation specific properties
@@ -310,7 +310,16 @@
ttl,
soTimeout,
this);
-
+ String value = properties.getProperty("recoveryEnabled","true");
+ boolean recEnabled = Boolean.valueOf(value).booleanValue() ;
+ impl.setRecoveryEnabled(recEnabled);
+ int recCnt =
Integer.parseInt(properties.getProperty("recoveryCounter","10"));
+ impl.setRecoveryCounter(recCnt);
+ long recSlpTime =
Long.parseLong(properties.getProperty("recoverySleepTime","5000"));
+ impl.setRecoverySleepTime(recSlpTime);
+ if(log.isDebugEnabled())
+ log.debug("Recovery Options (enabled=" + recEnabled + ",counter="
+recCnt+ ",time=" +recSlpTime+").");
+
impl.start(level);
long memberwait =
(Long.parseLong(properties.getProperty("msgFrequency"))*4);
if(log.isInfoEnabled())
@@ -479,6 +488,36 @@
properties.setProperty("mcastTTL", String.valueOf(mcastTTL));
}
+ public int getRecoveryCounter() {
+ if(impl != null)
+ return impl.getRecoveryCounter() ;
+ else return
Integer.parseInt(properties.getProperty("recoveryCounter","10"));
+ }
+
+ public boolean isRecoveryEnabled() {
+ if(impl != null)
+ return impl.isRecoveryEnabled() ;
+ else return
Boolean.getBoolean(properties.getProperty("recoveryEnabled","true"));
+ }
+
+ public long getRecoverySleepTime() {
+ if(impl != null)
+ return impl.getRecoverySleepTime() ;
+ else return
Long.parseLong(properties.getProperty("recoverySleepTime","5000"));
+ }
+
+ public void setRecoveryCounter(int recoveryCounter) {
+ properties.setProperty("recoveryCounter",
String.valueOf(recoveryCounter));
+ }
+
+ public void setRecoveryEnabled(boolean recoveryEnabled) {
+ properties.setProperty("recoveryEnabled",
String.valueOf(recoveryEnabled));
+ }
+
+ public void setRecoverySleepTime(long recoverySleepTime) {
+ properties.setProperty("recoverySleepTime",
String.valueOf(recoverySleepTime));
+ }
+
/**
* Simple test program
* @param args Command-line arguments
@@ -501,4 +540,7 @@
service.start();
Thread.sleep(60*1000*60);
}
+
+
+
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
Sun Aug 19 14:04:15 2007
@@ -22,6 +22,8 @@
import java.io.IOException;
import java.net.InetAddress ;
import java.net.DatagramPacket;
+import java.net.SocketTimeoutException;
+
import org.apache.catalina.cluster.MembershipListener;
/**
@@ -33,6 +35,7 @@
* Need to fix this, could use java.nio and only need one thread to send and
receive, or
* just use a timeout on the receive
* @author Filip Hanik
+ * @author Peter Rossbach
* @version $Revision$, $Date$
*/
public class McastServiceImpl
@@ -102,6 +105,21 @@
protected InetAddress mcastBindAddress = null;
/**
+ * nr of times the system has to fail before a recovery is initiated
+ */
+ protected int recoveryCounter = 10;
+
+ /**
+ * The time the recovery thread sleeps between recovery attempts
+ */
+ protected long recoverySleepTime = 5000;
+
+ /**
+ * Add the ability to turn on/off recovery
+ */
+ protected boolean recoveryEnabled = true;
+
+ /**
* Create a new mcast service impl
* @param member - the local member
* @param sendFrequency - the time (ms) in between pings sent out
@@ -129,6 +147,13 @@
this.mcastSoTimeout = soTimeout;
this.mcastTTL = ttl;
this.mcastBindAddress = bind;
+ timeToExpiration = expireTime;
+ this.service = service;
+ this.sendFrequency = sendFrequency;
+ init();
+ }
+
+ protected void init() throws IOException {
setupSocket();
sendPacket = new DatagramPacket(new byte[1000],1000);
sendPacket.setAddress(address);
@@ -136,27 +161,25 @@
receivePacket = new DatagramPacket(new byte[1000],1000);
receivePacket.setAddress(address);
receivePacket.setPort(port);
- membership = new McastMembership(member.getName());
- timeToExpiration = expireTime;
- this.service = service;
- this.sendFrequency = sendFrequency;
+ if(membership == null) membership = new
McastMembership(member.getName());
}
protected void setupSocket() throws IOException {
if (mcastBindAddress != null) socket = new MulticastSocket(new
java.net.
InetSocketAddress(mcastBindAddress, port));
else socket = new MulticastSocket(port);
+ socket.setLoopbackMode(false); //hint that we don't need loop back
messages
if (mcastBindAddress != null) {
if(log.isInfoEnabled())
log.info("Setting multihome multicast interface to:" +
mcastBindAddress);
socket.setInterface(mcastBindAddress);
} //end if
- if ( mcastSoTimeout >= 0 ) {
- if(log.isInfoEnabled())
- log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
- socket.setSoTimeout(mcastSoTimeout);
- }
+ //force a so timeout so that we don't block forever
+ if ( mcastSoTimeout <= 0 ) mcastSoTimeout = (int)sendFrequency;
+ if(log.isInfoEnabled())
+ log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
+ socket.setSoTimeout(mcastSoTimeout);
if ( mcastTTL >= 0 ) {
if(log.isInfoEnabled())
log.info("Setting cluster mcast TTL to " + mcastTTL);
@@ -193,11 +216,17 @@
* @throws IOException if the service fails to disconnect from the sockets
*/
public synchronized void stop() throws IOException {
- socket.leaveGroup(address);
- doRun = false;
- sender = null;
- receiver = null;
- serviceStartTime = Long.MAX_VALUE;
+ try {
+ socket.leaveGroup(address);
+ } catch (IOException ignore) {
+ } finally {
+ doRun = false;
+ if(sender!= null) sender.interrupt() ;
+ sender = null;
+ if(receiver!= null) receiver.interrupt() ;
+ receiver = null;
+ serviceStartTime = Long.MAX_VALUE;
+ }
}
/**
@@ -205,22 +234,37 @@
* @throws IOException
*/
public void receive() throws IOException {
- socket.receive(receivePacket);
- byte[] data = new byte[receivePacket.getLength()];
-
System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
- McastMember m = McastMember.getMember(data);
- if(log.isDebugEnabled())
- log.debug("Mcast receive ping from member " + m);
- if ( membership.memberAlive(m) ) {
+ try {
+ socket.receive(receivePacket);
+
+ byte[] data = new byte[receivePacket.getLength()];
+
System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
+ McastMember m = McastMember.getMember(data);
if(log.isDebugEnabled())
- log.debug("Mcast add member " + m);
- service.memberAdded(m);
+ log.debug("Mcast receive ping from member " + m);
+ if ( membership.memberAlive(m) ) {
+ if(log.isDebugEnabled())
+ log.debug("Mcast add member " + m);
+ service.memberAdded(m);
+ }
+ } finally {
+ checkExpire();
}
- McastMember[] expired = membership.expire(timeToExpiration);
- for ( int i=0; i<expired.length; i++) {
- if(log.isDebugEnabled())
- log.debug("Mcast exipre member " + m);
- service.memberDisappeared(expired[i]);
+ }
+
+ protected Object expiredMutex = new Object();
+
+ /**
+ * check member exipre or alive
+ */
+ protected void checkExpire() {
+ synchronized (expiredMutex) {
+ McastMember[] expired = membership.expire(timeToExpiration);
+ for ( int i=0; i<expired.length; i++) {
+ if(log.isDebugEnabled())
+ log.debug("Mcast exipre member " + expired[i]);
+ service.memberDisappeared(expired[i]);
+ }
}
}
@@ -229,55 +273,198 @@
* @throws Exception
*/
public void send() throws Exception{
- member.inc();
- if(log.isDebugEnabled())
- log.debug("Mcast send ping from member " + member);
- byte[] data = member.getData(this.serviceStartTime);
- DatagramPacket p = new DatagramPacket(data,data.length);
- p.setAddress(address);
- p.setPort(port);
- socket.send(p);
+ try {
+ member.inc();
+
+ if(log.isDebugEnabled())
+ log.debug("Mcast send ping from member " + member);
+ byte[] data = member.getData(this.serviceStartTime);
+ DatagramPacket p = new DatagramPacket(data,data.length);
+ p.setAddress(address);
+ p.setPort(port);
+ socket.send(p);
+ } finally {
+ checkExpire() ;
+ }
}
public long getServiceStartTime() {
return this.serviceStartTime;
}
+ public int getRecoveryCounter() {
+ return recoveryCounter;
+ }
+
+ public boolean isRecoveryEnabled() {
+ return recoveryEnabled;
+ }
+
+ public long getRecoverySleepTime() {
+ return recoverySleepTime;
+ }
+ public void setRecoveryCounter(int recoveryCounter) {
+ this.recoveryCounter = recoveryCounter;
+ }
+
+ public void setRecoveryEnabled(boolean recoveryEnabled) {
+ this.recoveryEnabled = recoveryEnabled;
+ }
+
+ public void setRecoverySleepTime(long recoverySleepTime) {
+ this.recoverySleepTime = recoverySleepTime;
+ }
+
public class ReceiverThread extends Thread {
+
public ReceiverThread() {
super();
setName("Cluster-MembershipReceiver");
}
+
public void run() {
+ int errorCounter = 0 ;
while ( doRun ) {
try {
receive();
+ errorCounter = 0;
} catch ( Exception x ) {
- log.warn("Error receiving mcast package. Sleeping
500ms",x);
- try { Thread.sleep(500); } catch ( Exception ignore ){}
-
+ if (errorCounter==0) {
+ if(! (x instanceof SocketTimeoutException))
+ log.warn("Error receiving mcast package
(errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency + " ms",x);
+ } else {
+ if(! (x instanceof SocketTimeoutException)
+ && log.isDebugEnabled())
+ log.debug("Error receiving mcast package
(errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency+ " ms",x);
+ }
+ try { Thread.sleep(sendFrequency); } catch ( Exception
ignore ){}
+ if ( (++errorCounter)>=recoveryCounter ) {
+ log.warn("Error receiving mcast package
(errorCounter=" +errorCounter+ "). Try Recovery!",x);
+ errorCounter=0;
+ new RecoveryThread(McastServiceImpl.this);
+ }
}
}
+ log.warn("Receiver Thread ends with errorCounter=" +errorCounter+
".");
+
}
- }//class ReceiverThread
+ }
public class SenderThread extends Thread {
+
long time;
+
+ McastServiceImpl service ;
+
public SenderThread(long time) {
this.time = time;
setName("Cluster-MembershipSender");
}
+
public void run() {
+ int errorCounter = 0 ;
while ( doRun ) {
try {
send();
+ errorCounter = 0;
} catch ( Exception x ) {
- log.warn("Unable to send mcast message.",x);
+ if (errorCounter==0) {
+ log.warn("Unable to send mcast message.",x);
+ }
+ else {
+ if(log.isDebugEnabled())
+ log.debug("Unable to send mcast message.",x);
+ }
+ if ( (++errorCounter)>=recoveryCounter ) {
+ errorCounter=0;
+ new RecoveryThread(McastServiceImpl.this);
+ }
}
try { Thread.sleep(time); } catch ( Exception ignore ) {}
}
+ log.warn("Sender Thread ends with errorCounter=" +errorCounter+
".");
+ }
+ }
+
+ protected static class RecoveryThread extends Thread {
+
+ static boolean running = false;
+
+ McastServiceImpl parent = null;
+
+ public RecoveryThread(McastServiceImpl parent) {
+ this.parent = parent;
+ if (!init(this)) parent = null;
+ }
+
+ public static synchronized boolean init(RecoveryThread t) {
+ if ( running ) {
+ return false;
+ }
+ if ( !t.parent.isRecoveryEnabled()) {
+ return false;
+ }
+ running = true;
+ t.setName("Cluster-MembershipRecovery");
+ t.setDaemon(true);
+ t.start();
+ return true;
+ }
+
+ public boolean stopService() {
+ try {
+ parent.stop();
+ return true;
+ } catch (Exception x) {
+ log.warn("Recovery thread failed to stop membership service.",
x);
+ return false;
+ }
}
- }//class SenderThread
+
+ public boolean startService() {
+ try {
+ parent.init();
+ parent.start(1);
+ parent.start(2);
+ return true;
+ } catch (Exception x) {
+ log.warn("Recovery thread failed to start membership
service.", x);
+ return false;
+ }
+ }
+
+ public void run() {
+ boolean success = false;
+ int attempt = 0;
+ try {
+ while (!success) {
+ if(log.isInfoEnabled())
+ log.info("Cluster membership, running recovery thread,
multicasting is not functional.");
+ success = stopService();
+ if(success) {
+ try {
+ Thread.sleep(1000 + parent.mcastSoTimeout);
+ } catch (Exception ignore){}
+ success = startService();
+ if(success && log.isInfoEnabled())
+ log.info("Membership recovery was successful.");
+ }
+ try {
+ if (!success) {
+ if(log.isInfoEnabled())
+ log.info("Recovery attempt " + (++attempt) + "
failed, trying again in " +parent.recoverySleepTime + " milliseconds");
+ Thread.sleep(parent.recoverySleepTime);
+ // check member expire...
+ parent.checkExpire() ;
+ }
+ }catch (InterruptedException ignore) {
+ }
+ }
+ } finally {
+ running = false;
+ }
+ }
+ }
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
Sun Aug 19 14:04:15 2007
@@ -53,6 +53,16 @@
<attribute name="mcastTTL"
description=""
type="int"/>
+ <attribute name="recoveryCounter"
+ description="Counter after membership failure socket restarted
(default 10)"
+ type="int"/>
+ <attribute name="recoveryEnabled"
+ description="Membership recovery enabled (default true)"
+ is="true"
+ type="boolean"/>
+ <attribute name="recoverySleepTime"
+ description="Sleep time between next socket recovery (5000 msec)"
+ type="long"/>
<attribute name="localMemberName"
description="Complete local receiver information"
type="java.lang.String"
Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Sun Aug 19 14:04:15 2007
@@ -135,6 +135,9 @@
<subsection name="Cluster">
<changelog>
<fix>
+ <bug>40042</bug>: Recovery membership heartbeat after interface down.
(pero)
+ </fix>
+ <fix>
<bug>42691</bug>: Don't set access time after session sync. Fix that
sessions
after node restart better expire. Requested by Casey Lucas (pero)
</fix>
Modified: tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml Sun Aug 19 14:04:15
2007
@@ -497,13 +497,16 @@
mcastClusterDomain="d10"
mcastPort="45564"
mcastFrequency="1000"
- mcastDropTime="30000"/>
+ mcastDropTime="30000"
+
recoveryCounter="10"
+ recoveryEnabled="true"
+ recoverySleepTime="5000"/>
<Receiver
className="org.apache.catalina.cluster.tcp.ReplicationListener"
tcpListenAddress="auto"
tcpListenPort="9015"
tcpSelectorTimeout="100"
- tcpThreadCount="6"
+ tcpThreadCount="6"/>
<Sender
className="org.apache.catalina.cluster.tcp.ReplicationTransmitter"
replicationMode="fastasyncqueue"
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]