Author: fhanik
Date: Tue Jun 13 23:13:24 2006
New Revision: 414100

URL: http://svn.apache.org/viewvc?rev=414100&view=rev
Log:
More modifications, seems to be a synchronous block that runs through the 
election campaign

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
 Tue Jun 13 23:13:24 2006
@@ -143,7 +143,7 @@
     /**
      * Time to wait for coordination timeout
      */
-    protected long waitForCoordMsgTimeout = 5000;
+    protected long waitForCoordMsgTimeout = 15000;
     /**
      * Our current view
      */
@@ -191,7 +191,14 @@
                 
this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
                 return; //the only member, no need for an election
             }
-            if ( suggestedviewId != null ) return;//election already running, 
I'm not allowed to have two of them
+            if ( suggestedviewId != null ) {
+                fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election 
abandoned, election running"));
+                return; //election already running, I'm not allowed to have 
two of them
+            }
+            if ( view != null && Arrays.diff(view,membership,local).length == 
0 &&  Arrays.diff(membership,view,local).length == 0) {
+                fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election 
abandoned, view matches membership"));
+                return; //already have this view installed
+            }            
             int prio = AbsoluteOrder.comp.compare(local,others[0]);
             MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader 
in my view?
             if ( local.equals(leader) || force ) {
@@ -300,13 +307,13 @@
     }
     
     protected void processCoordMessage(CoordinationMessage msg, Member sender) 
throws ChannelException {
-        synchronized (electionMutex) {
+//        synchronized (electionMutex) {
             coordMsgReceived.set(true);
             msg.timestamp = System.currentTimeMillis();
             Membership merged = mergeOnArrive(msg,sender);
             if ( isViewConf(msg) ) handleViewConf(msg, sender,merged);
             else handleToken(msg, sender, merged);
-        }
+//        }
     }
     
     protected void handleToken(CoordinationMessage msg, Member 
sender,Membership merged) throws ChannelException {
@@ -433,24 +440,33 @@
 //              OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE    
 
//============================================================================================================
     public void start(int svc) throws ChannelException {
-        if ( membership == null ) setupMembership();
-        if (started) return;
-        fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_START,this,"Before start"));
-        super.start(startsvc);
-        started = true;
-        view = new 
Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,true);
-        fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_START,this,"After start"));
-        startElection(false);
+//        synchronized (electionMutex) {
+            if (membership == null) setupMembership();
+            if (started)return;
+            fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
+            super.start(startsvc);
+            started = true;
+            if (view == null) view = new Membership( 
(MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
+            fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
+            startElection(false);
+//        }
     }
 
     public void stop(int svc) throws ChannelException {
         try {
             halt();
-            if ( !started ) return;
-            started = false;
-            fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_STOP,this,"Before stop"));
-            super.stop(startsvc);
-            fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_STOP,this,"After stop"));
+            synchronized (electionMutex) {
+                if (!started)return;
+                started = false;
+                fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
+                super.stop(startsvc);
+                this.view = null;
+                this.viewId = null;
+                this.suggestedView = null;
+                this.suggestedviewId = null;
+                this.membership.reset();
+                fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
+            }
         }finally {
             release();
         }
@@ -486,6 +502,7 @@
     }
 
     public void memberAdded(Member member) {
+        System.out.println("MBR ADD THREAD:"+Thread.currentThread().getName());
         memberAdded(member,true);
     }
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
 Tue Jun 13 23:13:24 2006
@@ -170,7 +170,7 @@
             if (membership == null) setupMembership();
             //update all alive times
             Member[] members = super.getMembers();
-            for (int i = 0; i < members.length; i++) {
+            for (int i = 0; members != null && i < members.length; i++) {
                 if (membership.memberAlive( (MemberImpl) members[i])) {
                     //we don't have this one in our membership, check to see 
if he/she is alive
                     if (memberAlive(members[i])) {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java
 Tue Jun 13 23:13:24 2006
@@ -67,7 +67,7 @@
     public void setTransport(MultiPointSender transport) {
         this.transport = transport;
     }
-
+    
     // ------------------------------------------------------------- public
     
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414100&r1=414099&r2=414100&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
 Tue Jun 13 23:13:24 2006
@@ -1,23 +1,26 @@
 package org.apache.catalina.tribes.demos;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.GroupChannel;
 import 
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
 import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
 import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
-import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.transport.ReceiverBase;
 import org.apache.catalina.tribes.util.Arrays;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.IOException;
-import java.util.StringTokenizer;
 
 
 
 public class CoordinationDemo {
     static int CHANNEL_COUNT = 5;
     static int SCREEN_WIDTH = 120;
+    static long SLEEP_TIME = 10;
     static boolean MULTI_THREAD = false;
     StringBuffer statusLine = new StringBuffer();
     Status[] status = null;
@@ -48,9 +51,9 @@
     
     public synchronized void printScreen() {
         clearScreen();
-        System.out.println("XXX. "+getHeader());
+        System.out.println(" ###."+getHeader());
         for ( int i=0; i<status.length; i++ ) {
-            System.out.print(fill(String.valueOf(i+1)+".",5," "));
+            System.out.print(leftfill(String.valueOf(i+1)+".",5," "));
             if ( status[i] != null ) 
System.out.print(status[i].getStatusLine());
         }
         System.out.println("\n\n");
@@ -61,19 +64,20 @@
     
     public String getHeader() {
         //member - 30
-        //running- 8
+        //running- 10
         //coord - 30
         //view-id - 24
         //view count - 8
 
         StringBuffer buf = new StringBuffer();
-        buf.append(fill("Member",30," "));
-        buf.append(fill("Running",8," "));
-        buf.append(fill("Coord",30," "));
-        buf.append(fill("View-id(short)",24," "));
-        buf.append(fill("Count",8," "));
+        buf.append(leftfill("Member",30," "));
+        buf.append(leftfill("Running",10," "));
+        buf.append(leftfill("Coord",30," "));
+        buf.append(leftfill("View-id(short)",24," "));
+        buf.append(leftfill("Count",8," "));
         buf.append("\n");
-        buf.append(fill("",SCREEN_WIDTH,"="));
+        
+        buf.append(rightfill("==="+new 
java.sql.Timestamp(System.currentTimeMillis()).toString(),SCREEN_WIDTH,"="));
         buf.append("\n");
         return buf.toString();
     }
@@ -179,10 +183,19 @@
         demo.waitForInput();
     }
     
-    public static String fill(String value, int length, String ch) {
+    public static String leftfill(String value, int length, String ch) {
+        return fill(value,length,ch,true);
+    }
+    
+    public static String rightfill(String value, int length, String ch) {
+        return fill(value,length,ch,false);
+    }    
+
+    public static String fill(String value, int length, String ch, boolean 
left) {
         StringBuffer buf = new StringBuffer();
+        if ( !left ) buf.append(value.trim());
         for (int i=value.trim().length(); i<length; i++ ) buf.append(ch);
-        buf.append(value.trim());
+        if ( left ) buf.append(value.trim());
         return buf.toString();
     }
     
@@ -193,7 +206,7 @@
         NonBlockingCoordinator interceptor = null;
         public String status;
         public Exception error;
-        public boolean started = false;
+        public String startstatus = "new";
         
         public Status(CoordinationDemo parent) {
             this.parent = parent;
@@ -201,7 +214,7 @@
         
         public String getStatusLine() {
             //member - 30
-            //running- 8
+            //running- 10
             //coord - 30
             //view-id - 24
             //view count - 8
@@ -217,11 +230,11 @@
                 viewId = 
getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new
 byte[0]);
                 count = String.valueOf(interceptor.getView().length);
             }
-            buf.append(fill(local,30," "));
-            buf.append(fill(String.valueOf(started), 8, " "));
-            buf.append(fill(coord, 30, " "));
-            buf.append(fill(viewId, 24, " "));
-            buf.append(fill(count, 8, " "));
+            buf.append(leftfill(local,30," "));
+            buf.append(leftfill(startstatus, 10, " "));
+            buf.append(leftfill(coord, 30, " "));
+            buf.append(leftfill(viewId, 24, " "));
+            buf.append(leftfill(count, 8, " "));
             buf.append("\n");
             buf.append("Status:"+status);
             buf.append("\n");
@@ -237,15 +250,21 @@
             try {
                 if ( channel == null ) {
                     channel = createChannel();
+                    startstatus = "starting";
                     channel.start(channel.DEFAULT);
-                    started = true;
+                    startstatus = "running";
                 } else {
                     status = "Channel already started.";
                 }
             } catch ( Exception x ) {
+                synchronized (System.err) {
+                    System.err.println("Start failed:");
+                    StackTraceElement[] els = x.getStackTrace();
+                    for (int i = 0; i < els.length; i++) 
System.err.println(els[i].toString());
+                }
                 status = "Start failed:"+x.getMessage();
                 error = x;
-                started = false;
+                startstatus = "failed";
             }
         }
         
@@ -258,10 +277,16 @@
                     status = "Channel Already Stopped";
                 }
             }catch ( Exception x )  {
+                synchronized (System.err) {
+                    System.err.println("Stop failed:");
+                    StackTraceElement[] els = x.getStackTrace();
+                    for (int i = 0; i < els.length; i++) 
System.err.println(els[i].toString());
+                }
+
                 status = "Stop failed:"+x.getMessage();
                 error = x;
             }finally {
-                started = false;
+                startstatus = "stopped";
                 channel = null;
                 interceptor = null;
             }
@@ -269,11 +294,12 @@
         
         public GroupChannel createChannel() {
             channel = new GroupChannel();
+            ((ReceiverBase)channel.getChannelReceiver()).setAutoBind(100);
             interceptor = new NonBlockingCoordinator() {
                 public void fireInterceptorEvent(InterceptorEvent event) {
                     status = event.getEventTypeDesc();
                     parent.printScreen();
-                    try { Thread.sleep(100); }catch ( Exception x){}
+                    try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){}
                     
                 }
             };



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to