Author: fhanik Date: Tue Jun 13 23:13:24 2006 New Revision: 414100 URL: http://svn.apache.org/viewvc?rev=414100&view=rev Log: More modifications, seems to be a synchronous block that runs through the election campaign
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414100&r1=414099&r2=414100&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue Jun 13 23:13:24 2006 @@ -143,7 +143,7 @@ /** * Time to wait for coordination timeout */ - protected long waitForCoordMsgTimeout = 5000; + protected long waitForCoordMsgTimeout = 15000; /** * Our current view */ @@ -191,7 +191,14 @@ this.handleViewConf(this.createElectionMsg(local,others,local),local,view); return; //the only member, no need for an election } - if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them + if ( suggestedviewId != null ) { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running")); + return; //election already running, I'm not allowed to have two of them + } + if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership")); + return; //already have this view installed + } int prio = AbsoluteOrder.comp.compare(local,others[0]); MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view? if ( local.equals(leader) || force ) { @@ -300,13 +307,13 @@ } protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException { - synchronized (electionMutex) { +// synchronized (electionMutex) { coordMsgReceived.set(true); msg.timestamp = System.currentTimeMillis(); Membership merged = mergeOnArrive(msg,sender); if ( isViewConf(msg) ) handleViewConf(msg, sender,merged); else handleToken(msg, sender, merged); - } +// } } protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { @@ -433,24 +440,33 @@ // OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE //============================================================================================================ public void start(int svc) throws ChannelException { - if ( membership == null ) setupMembership(); - if (started) return; - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START,this,"Before start")); - super.start(startsvc); - started = true; - view = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,true); - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START,this,"After start")); - startElection(false); +// synchronized (electionMutex) { + if (membership == null) setupMembership(); + if (started)return; + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start")); + super.start(startsvc); + started = true; + if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start")); + startElection(false); +// } } public void stop(int svc) throws ChannelException { try { halt(); - if ( !started ) return; - started = false; - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP,this,"Before stop")); - super.stop(startsvc); - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP,this,"After stop")); + synchronized (electionMutex) { + if (!started)return; + started = false; + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop")); + super.stop(startsvc); + this.view = null; + this.viewId = null; + this.suggestedView = null; + this.suggestedviewId = null; + this.membership.reset(); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop")); + } }finally { release(); } @@ -486,6 +502,7 @@ } public void memberAdded(Member member) { + System.out.println("MBR ADD THREAD:"+Thread.currentThread().getName()); memberAdded(member,true); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=414100&r1=414099&r2=414100&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Tue Jun 13 23:13:24 2006 @@ -170,7 +170,7 @@ if (membership == null) setupMembership(); //update all alive times Member[] members = super.getMembers(); - for (int i = 0; i < members.length; i++) { + for (int i = 0; members != null && i < members.length; i++) { if (membership.memberAlive( (MemberImpl) members[i])) { //we don't have this one in our membership, check to see if he/she is alive if (memberAlive(members[i])) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java?rev=414100&r1=414099&r2=414100&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReplicationTransmitter.java Tue Jun 13 23:13:24 2006 @@ -67,7 +67,7 @@ public void setTransport(MultiPointSender transport) { this.transport = transport; } - + // ------------------------------------------------------------- public /** Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414100&r1=414099&r2=414100&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Tue Jun 13 23:13:24 2006 @@ -1,23 +1,26 @@ package org.apache.catalina.tribes.demos; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.StringTokenizer; + +import org.apache.catalina.tribes.ChannelInterceptor; +import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; +import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; -import org.apache.catalina.tribes.ChannelInterceptor; +import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.catalina.tribes.util.Arrays; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.IOException; -import java.util.StringTokenizer; public class CoordinationDemo { static int CHANNEL_COUNT = 5; static int SCREEN_WIDTH = 120; + static long SLEEP_TIME = 10; static boolean MULTI_THREAD = false; StringBuffer statusLine = new StringBuffer(); Status[] status = null; @@ -48,9 +51,9 @@ public synchronized void printScreen() { clearScreen(); - System.out.println("XXX. "+getHeader()); + System.out.println(" ###."+getHeader()); for ( int i=0; i<status.length; i++ ) { - System.out.print(fill(String.valueOf(i+1)+".",5," ")); + System.out.print(leftfill(String.valueOf(i+1)+".",5," ")); if ( status[i] != null ) System.out.print(status[i].getStatusLine()); } System.out.println("\n\n"); @@ -61,19 +64,20 @@ public String getHeader() { //member - 30 - //running- 8 + //running- 10 //coord - 30 //view-id - 24 //view count - 8 StringBuffer buf = new StringBuffer(); - buf.append(fill("Member",30," ")); - buf.append(fill("Running",8," ")); - buf.append(fill("Coord",30," ")); - buf.append(fill("View-id(short)",24," ")); - buf.append(fill("Count",8," ")); + buf.append(leftfill("Member",30," ")); + buf.append(leftfill("Running",10," ")); + buf.append(leftfill("Coord",30," ")); + buf.append(leftfill("View-id(short)",24," ")); + buf.append(leftfill("Count",8," ")); buf.append("\n"); - buf.append(fill("",SCREEN_WIDTH,"=")); + + buf.append(rightfill("==="+new java.sql.Timestamp(System.currentTimeMillis()).toString(),SCREEN_WIDTH,"=")); buf.append("\n"); return buf.toString(); } @@ -179,10 +183,19 @@ demo.waitForInput(); } - public static String fill(String value, int length, String ch) { + public static String leftfill(String value, int length, String ch) { + return fill(value,length,ch,true); + } + + public static String rightfill(String value, int length, String ch) { + return fill(value,length,ch,false); + } + + public static String fill(String value, int length, String ch, boolean left) { StringBuffer buf = new StringBuffer(); + if ( !left ) buf.append(value.trim()); for (int i=value.trim().length(); i<length; i++ ) buf.append(ch); - buf.append(value.trim()); + if ( left ) buf.append(value.trim()); return buf.toString(); } @@ -193,7 +206,7 @@ NonBlockingCoordinator interceptor = null; public String status; public Exception error; - public boolean started = false; + public String startstatus = "new"; public Status(CoordinationDemo parent) { this.parent = parent; @@ -201,7 +214,7 @@ public String getStatusLine() { //member - 30 - //running- 8 + //running- 10 //coord - 30 //view-id - 24 //view count - 8 @@ -217,11 +230,11 @@ viewId = getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new byte[0]); count = String.valueOf(interceptor.getView().length); } - buf.append(fill(local,30," ")); - buf.append(fill(String.valueOf(started), 8, " ")); - buf.append(fill(coord, 30, " ")); - buf.append(fill(viewId, 24, " ")); - buf.append(fill(count, 8, " ")); + buf.append(leftfill(local,30," ")); + buf.append(leftfill(startstatus, 10, " ")); + buf.append(leftfill(coord, 30, " ")); + buf.append(leftfill(viewId, 24, " ")); + buf.append(leftfill(count, 8, " ")); buf.append("\n"); buf.append("Status:"+status); buf.append("\n"); @@ -237,15 +250,21 @@ try { if ( channel == null ) { channel = createChannel(); + startstatus = "starting"; channel.start(channel.DEFAULT); - started = true; + startstatus = "running"; } else { status = "Channel already started."; } } catch ( Exception x ) { + synchronized (System.err) { + System.err.println("Start failed:"); + StackTraceElement[] els = x.getStackTrace(); + for (int i = 0; i < els.length; i++) System.err.println(els[i].toString()); + } status = "Start failed:"+x.getMessage(); error = x; - started = false; + startstatus = "failed"; } } @@ -258,10 +277,16 @@ status = "Channel Already Stopped"; } }catch ( Exception x ) { + synchronized (System.err) { + System.err.println("Stop failed:"); + StackTraceElement[] els = x.getStackTrace(); + for (int i = 0; i < els.length; i++) System.err.println(els[i].toString()); + } + status = "Stop failed:"+x.getMessage(); error = x; }finally { - started = false; + startstatus = "stopped"; channel = null; interceptor = null; } @@ -269,11 +294,12 @@ public GroupChannel createChannel() { channel = new GroupChannel(); + ((ReceiverBase)channel.getChannelReceiver()).setAutoBind(100); interceptor = new NonBlockingCoordinator() { public void fireInterceptorEvent(InterceptorEvent event) { status = event.getEventTypeDesc(); parent.printScreen(); - try { Thread.sleep(100); }catch ( Exception x){} + try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){} } }; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]