Author: fhanik Date: Fri Aug 17 11:33:08 2007 New Revision: 567104 URL: http://svn.apache.org/viewvc?view=rev&rev=567104 Log: Added in a recovery service to the membership layer. if the network card gets disabled or something similar happens, it tries to recover the service
Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?view=diff&rev=567104&r1=567103&r2=567104 ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Fri Aug 17 11:33:08 2007 @@ -121,6 +121,11 @@ protected InetAddress mcastBindAddress = null; /** + * nr of times the system has to fail before a recovery is initiated + */ + protected int recoveryCounter = 10; + + /** * Create a new mcast service impl * @param member - the local member * @param sendFrequency - the time (ms) in between pings sent out @@ -151,6 +156,10 @@ this.timeToExpiration = expireTime; this.service = service; this.sendFrequency = sendFrequency; + init(); + } + + public void init() throws IOException { setupSocket(); sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE); sendPacket.setAddress(address); @@ -158,6 +167,8 @@ receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE); receivePacket.setAddress(address); receivePacket.setPort(port); + member.setCommand(new byte[0]); + member.getData(true, true); membership = new Membership(member); } @@ -356,23 +367,29 @@ public class ReceiverThread extends Thread { + int errorCounter = 0; public ReceiverThread() { super(); - setName("Cluster-MembershipReceiver"); + setName("Tribes-MembershipReceiver"); } public void run() { while ( doRunReceiver ) { try { receive(); + errorCounter=0; } catch ( ArrayIndexOutOfBoundsException ax ) { //we can ignore this, as it means we have an invalid package //but we will log it to debug if ( log.isDebugEnabled() ) log.debug("Invalid member mcast package.",ax); } catch ( Exception x ) { - log.warn("Error receiving mcast package. Sleeping 500ms",x); + if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x); + else log.debug("Error receiving mcast package. Sleeping 500ms",x); try { Thread.sleep(500); } catch ( Exception ignore ){} - + if ( (++errorCounter)>=recoveryCounter ) { + errorCounter=0; + new RecoveryThread(McastServiceImpl.this); + } } } } @@ -380,20 +397,87 @@ public class SenderThread extends Thread { long time; + int errorCounter=0; public SenderThread(long time) { this.time = time; - setName("Cluster-MembershipSender"); + setName("Tribes-MembershipSender"); } public void run() { while ( doRunSender ) { try { send(true); + errorCounter = 0; } catch ( Exception x ) { - log.warn("Unable to send mcast message.",x); + if (errorCounter==0) log.warn("Unable to send mcast message.",x); + else log.debug("Unable to send mcast message.",x); + if ( (++errorCounter)>=recoveryCounter ) { + errorCounter=0; + new RecoveryThread(McastServiceImpl.this); + } } try { Thread.sleep(time); } catch ( Exception ignore ) {} } } }//class SenderThread + + protected static class RecoveryThread extends Thread { + static boolean running = false; + McastServiceImpl parent = null; + public RecoveryThread(McastServiceImpl parent) { + this.parent = parent; + if (!init(this)) parent = null; + } + + public static synchronized boolean init(RecoveryThread t) { + if ( running ) return false; + running = true; + t.setName("Tribes-MembershipRecovery"); + t.setDaemon(true); + t.start(); + return true; + } + + public boolean stopService() { + try { + parent.stop(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ); + return true; + } catch (Exception x) { + log.warn("Recovery thread failed to stop membership service.", x); + return false; + } + } + public boolean startService() { + try { + parent.init(); + parent.start(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ); + return true; + } catch (Exception x) { + log.warn("Recovery thread failed to start membership service.", x); + return false; + } + } + public void run() { + boolean success = false; + int attempt = 0; + try { + while (!success) { + log.info("Tribes membership, running recovery thread, multicasting is not functional."); + if (stopService() & startService()) { + success = true; + log.info("Membership recovery was successful."); + } + try { + if (!success) { + log.info("Recovery attempt "+(++attempt)+" failed, trying again in 5 seconds"); + Thread.sleep(5000); + } + }catch (InterruptedException ignore) { + } + }//while + }finally { + running = false; + } + }//run + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]