Author: fhanik
Date: Fri Aug 17 11:33:08 2007
New Revision: 567104

URL: http://svn.apache.org/viewvc?view=rev&rev=567104
Log:
Added in a recovery service to the membership layer. if the network card gets 
disabled or something similar happens, it tries to recover the service

Modified:
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?view=diff&rev=567104&r1=567103&r2=567104
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
(original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
Fri Aug 17 11:33:08 2007
@@ -121,6 +121,11 @@
     protected InetAddress mcastBindAddress = null;
     
     /**
+     * nr of times the system has to fail before a recovery is initiated
+     */
+    protected int recoveryCounter = 10;
+    
+    /**
      * Create a new mcast service impl
      * @param member - the local member
      * @param sendFrequency - the time (ms) in between pings sent out
@@ -151,6 +156,10 @@
         this.timeToExpiration = expireTime;
         this.service = service;
         this.sendFrequency = sendFrequency;
+        init();
+    }
+
+    public void init() throws IOException {
         setupSocket();
         sendPacket = new DatagramPacket(new 
byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
         sendPacket.setAddress(address);
@@ -158,6 +167,8 @@
         receivePacket = new DatagramPacket(new 
byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
         receivePacket.setAddress(address);
         receivePacket.setPort(port);
+        member.setCommand(new byte[0]);
+        member.getData(true, true);
         membership = new Membership(member);
     }
     
@@ -356,23 +367,29 @@
 
 
     public class ReceiverThread extends Thread {
+        int errorCounter = 0;
         public ReceiverThread() {
             super();
-            setName("Cluster-MembershipReceiver");
+            setName("Tribes-MembershipReceiver");
         }
         public void run() {
             while ( doRunReceiver ) {
                 try {
                     receive();
+                    errorCounter=0;
                 } catch ( ArrayIndexOutOfBoundsException ax ) {
                     //we can ignore this, as it means we have an invalid 
package
                     //but we will log it to debug
                     if ( log.isDebugEnabled() )
                         log.debug("Invalid member mcast package.",ax);
                 } catch ( Exception x ) {
-                    log.warn("Error receiving mcast package. Sleeping 
500ms",x);
+                    if (errorCounter==0) log.warn("Error receiving mcast 
package. Sleeping 500ms",x);
+                    else log.debug("Error receiving mcast package. Sleeping 
500ms",x);
                     try { Thread.sleep(500); } catch ( Exception ignore ){}
-                    
+                    if ( (++errorCounter)>=recoveryCounter ) {
+                        errorCounter=0;
+                        new RecoveryThread(McastServiceImpl.this);
+                    }
                 }
             }
         }
@@ -380,20 +397,87 @@
 
     public class SenderThread extends Thread {
         long time;
+        int errorCounter=0;
         public SenderThread(long time) {
             this.time = time;
-            setName("Cluster-MembershipSender");
+            setName("Tribes-MembershipSender");
 
         }
         public void run() {
             while ( doRunSender ) {
                 try {
                     send(true);
+                    errorCounter = 0;
                 } catch ( Exception x ) {
-                    log.warn("Unable to send mcast message.",x);
+                    if (errorCounter==0) log.warn("Unable to send mcast 
message.",x);
+                    else log.debug("Unable to send mcast message.",x);
+                    if ( (++errorCounter)>=recoveryCounter ) {
+                        errorCounter=0;
+                        new RecoveryThread(McastServiceImpl.this);
+                    }
                 }
                 try { Thread.sleep(time); } catch ( Exception ignore ) {}
             }
         }
     }//class SenderThread
+
+    protected static class RecoveryThread extends Thread {
+        static boolean running = false;
+        McastServiceImpl parent = null;
+        public RecoveryThread(McastServiceImpl parent) {
+            this.parent = parent;
+            if (!init(this)) parent = null;
+        }
+        
+        public static synchronized boolean init(RecoveryThread t) {
+            if ( running ) return false;
+            running = true;
+            t.setName("Tribes-MembershipRecovery");
+            t.setDaemon(true);
+            t.start();
+            return true;
+        }
+
+        public boolean stopService() {
+            try {
+                parent.stop(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ);
+                return true;
+            } catch (Exception x) {
+                log.warn("Recovery thread failed to stop membership service.", 
x);
+                return false;
+            }
+        }
+        public boolean startService() {
+            try {
+                parent.init();
+                parent.start(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ);
+                return true;
+            } catch (Exception x) {
+                log.warn("Recovery thread failed to start membership 
service.", x);
+                return false;
+            }
+        }
+        public void run() {
+            boolean success = false;
+            int attempt = 0;
+            try {
+                while (!success) {
+                    log.info("Tribes membership, running recovery thread, 
multicasting is not functional.");
+                    if (stopService() & startService()) {
+                        success = true;
+                        log.info("Membership recovery was successful.");
+                    }
+                    try {
+                        if (!success) {
+                            log.info("Recovery attempt "+(++attempt)+" failed, 
trying again in 5 seconds");
+                            Thread.sleep(5000);
+                        }
+                    }catch (InterruptedException ignore) {
+                    }
+                }//while
+            }finally {
+                running = false;
+            }
+        }//run
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to