Author: fhanik Date: Tue Jun 13 15:07:01 2006 New Revision: 413988 URL: http://svn.apache.org/viewvc?rev=413988&view=rev Log: Completed almost all of the coordinator, still need test cases for all the use cases that exists
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java Tue Jun 13 15:07:01 2006 @@ -165,9 +165,14 @@ * @throws ChannelException if a startup error occurs or the service is already started. * @see Channel */ - public void stop(int svc) throws ChannelException; - + public void stop(int svc) throws ChannelException; + public void fireInterceptorEvent(InterceptorEvent event); + + interface InterceptorEvent { + int getEventType(); + ChannelInterceptor getInterceptor(); + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java Tue Jun 13 15:07:01 2006 @@ -60,5 +60,11 @@ public byte[] getBytes() { return id; } + + public String toString() { + StringBuffer buf = new StringBuffer("UniqueId"); + buf.append(org.apache.catalina.tribes.util.Arrays.toString(id)); + return buf.toString(); + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java Tue Jun 13 15:07:01 2006 @@ -162,6 +162,10 @@ public void stop(int svc) throws ChannelException { if (getNext() != null) getNext().stop(svc); } + + public void fireInterceptorEvent(InterceptorEvent event) { + //empty operation + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue Jun 13 15:07:01 2006 @@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.catalina.tribes.membership.*; +import org.apache.catalina.tribes.test.interceptors.TestNonBlockingCoordinator; +import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; +import org.apache.catalina.tribes.ChannelInterceptor; /** * <p>Title: Auto merging leader election algorithm</p> @@ -187,6 +190,7 @@ synchronized (electionMutex) { MemberImpl local = (MemberImpl)getLocalMember(false); MemberImpl[] others = (MemberImpl[])membership.getMembers(); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated")); if ( others.length == 0 ) { this.viewId = new UniqueId(UUIDGenerator.randomUUID(false)); this.view = new Membership(local,AbsoluteOrder.comp, true); @@ -201,19 +205,23 @@ suggestedviewId = msg.getId(); suggestedView = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(suggestedView,msg.getMembers()); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request")); sendElectionMsg(local,others[0],msg); } else { try { coordMsgReceived.set(false); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request")); electionMutex.wait(waitForCoordMsgTimeout); }catch ( InterruptedException x ) { Thread.currentThread().interrupted(); } if ( suggestedviewId == null && (!coordMsgReceived.get())) { //no message arrived, send the coord msg + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out.")); startElection(true); } }//end if + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election in progress")); } } @@ -227,6 +235,7 @@ } protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")")); super.sendMessage(new Member[] {next}, createData(msg, local), null); } @@ -269,6 +278,7 @@ } protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge")); MemberImpl local = (MemberImpl)getLocalMember(false); Membership merged = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(merged,msg.getMembers()); @@ -278,6 +288,7 @@ if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]); else memberAdded(diff[i],false); } + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge")); return merged; } @@ -352,6 +363,7 @@ } viewChange(viewId,view.getMembers()); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View id("+this.viewId+")")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); @@ -406,11 +418,14 @@ // OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE //============================================================================================================ public void start(int svc) throws ChannelException { - if (started)return; - super.start(startsvc); if ( membership == null ) setupMembership(); - startElection(false); + if (started) return; + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START,this,"Before start")); + super.start(startsvc); started = true; + view = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,true); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START,this,"After start")); + startElection(false); } public void stop(int svc) throws ChannelException { @@ -418,7 +433,9 @@ halt(); if ( !started ) return; started = false; + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP,this,"Before stop")); super.stop(startsvc); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP,this,"After stop")); }finally { release(); } @@ -433,9 +450,14 @@ public void messageReceived(ChannelMessage msg) { if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) { //ignore message, its an alive message + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message")); + } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { try { - processCoordMessage(new CoordinationMessage(msg.getMessage()), msg.getAddress()); + CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage()); + Member[] cmbr = cmsg.getMembers(); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")")); + processCoordMessage(cmsg, msg.getAddress()); }catch ( ChannelException x ) { log.error("Error processing coordination message. Could be fatal.",x); } @@ -457,6 +479,7 @@ if ( membership == null ) setupMembership(); if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); try { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add")); if (started && elect) startElection(false); }catch ( ChannelException x ) { log.error("Unable to start election when member was added.",x); @@ -472,13 +495,20 @@ membership.removeMember((MemberImpl)member); super.memberDisappeared(member); try { - if (started) startElection(false); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove")); + if ( started && (isCoordinator() || member.equals(getCoordinator())) ) + startElection(false); + //to do, if a member disappears, only the coordinator can start }catch ( ChannelException x ) { log.error("Unable to start election when member was removed.",x); } }finally { } - + } + + public boolean isCoordinator() { + Member coord = getCoordinator(); + return coord != null && getLocalMember(false).equals(coord); } public void heartbeat() { @@ -524,8 +554,7 @@ } protected synchronized void setupMembership() { - if ( view == null || membership == null ) { - view = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,true); + if ( membership == null ) { membership = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false); } } @@ -664,11 +693,65 @@ buf.append(id.getBytes(),0,id.getBytes().length); buf.append(type,0,type.length); } + } + + public void fireInterceptorEvent(InterceptorEvent event) { + System.out.println(event); + } + + public static class CoordinationEvent implements InterceptorEvent { + static final int EVT_START = 1; + static final int EVT_MBR_ADD = 2; + static final int EVT_MBR_DEL = 3; + static final int EVT_START_ELECT = 4; + static final int EVT_PROCESS_ELECT = 5; + static final int EVT_MSG_ARRIVE = 6; + static final int EVT_PRE_MERGE = 7; + static final int EVT_POST_MERGE = 8; + static final int EVT_WAIT_FOR_MSG = 9; + static final int EVT_SEND_MSG = 10; + static final int EVT_STOP = 11; + static final int EVT_CONF_RX = 12; + + int type; + ChannelInterceptor interceptor; + Member coord; + Member[] mbrs; + String info; + Membership view; + Membership suggestedView; + public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) { + this.type = type; + this.interceptor = interceptor; + this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator(); + this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers(); + this.info = info; + this.view = ((NonBlockingCoordinator)interceptor).view; + this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView; + } + public int getEventType() { + return type; + } + public ChannelInterceptor getInterceptor() { + return interceptor; + } - + public String toString() { + StringBuffer buf = new StringBuffer("CoordinationEvent[type="); + buf.append(type).append("\n\tLocal:"); + Member local = interceptor.getLocalMember(false); + buf.append(local!=null?local.getName():"").append("\n\tCoord:"); + buf.append(coord!=null?coord.getName():"").append("\n\tView:"); + buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:"); + buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:"); + buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:"); + buf.append(info).append("]"); + return buf.toString(); + } } + Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Tue Jun 13 15:07:01 2006 @@ -131,10 +131,11 @@ public synchronized void memberDisappeared(Member member) { if ( membership == null ) setupMembership(); - log.info("Received memberDisappeared["+member+"] message. Will verify."); + boolean shutdown = Arrays.equals(member.getPayload(),Member.SHUTDOWN_PAYLOAD); + if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify."); //check to see if the member really is gone //if the payload is not a shutdown message - if ( !memberAlive(member) ) { + if ( shutdown || !memberAlive(member) ) { //not correct, we need to maintain the map membership.removeMember((MemberImpl)member); super.memberDisappeared(member); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Tue Jun 13 15:07:01 2006 @@ -134,7 +134,7 @@ * Return the local member */ public Member getLocalMember(boolean alive) { - if ( alive ) localMember.setMemberAliveTime(System.currentTimeMillis()-impl.getServiceStartTime()); + if ( alive && localMember != null ) localMember.setMemberAliveTime(System.currentTimeMillis()-impl.getServiceStartTime()); return localMember; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Tue Jun 13 15:07:01 2006 @@ -30,7 +30,7 @@ * @version 1.0 */ public class Arrays { - + public static boolean contains(byte[] source, int srcoffset, byte[] key, int keyoffset, int length) { if ( srcoffset < 0 || srcoffset >= source.length) throw new ArrayIndexOutOfBoundsException("srcoffset is out of bounds."); if ( keyoffset < 0 || keyoffset >= key.length) throw new ArrayIndexOutOfBoundsException("keyoffset is out of bounds."); @@ -51,15 +51,48 @@ public static String toString(byte[] data, int offset, int length) { StringBuffer buf = new StringBuffer("{"); - buf.append(data[offset++]); - length--; - for ( int i=offset; i<length; i++ ) { - buf.append(", ").append(data[i]); + if ( data != null && length > 0 ) { + buf.append(data[offset++]); + for (int i = offset; i < length; i++) { + buf.append(", ").append(data[i]); + } + } + buf.append("}"); + return buf.toString(); + } + + public static String toString(Object[] data) { + return toString(data,0,data!=null?data.length:0); + } + + public static String toString(Object[] data, int offset, int length) { + StringBuffer buf = new StringBuffer("{"); + if ( data != null && length > 0 ) { + buf.append(data[offset++]); + for (int i = offset; i < length; i++) { + buf.append(", ").append(data[i]); + } } buf.append("}"); return buf.toString(); } + public static String toNameString(Member[] data) { + return toNameString(data,0,data!=null?data.length:0); + } + + public static String toNameString(Member[] data, int offset, int length) { + StringBuffer buf = new StringBuffer("{"); + if ( data != null && length > 0 ) { + buf.append(data[offset++].getName()); + for (int i = offset; i < length; i++) { + buf.append(", ").append(data[i].getName()); + } + } + buf.append("}"); + return buf.toString(); + } + public static int add(int[] data) { int result = 0; for (int i=0;i<data.length; i++ ) result += data[i]; Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java?rev=413988&r1=413987&r2=413988&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Tue Jun 13 15:07:01 2006 @@ -8,14 +8,17 @@ import org.apache.catalina.tribes.Member; import junit.framework.TestSuite; import junit.framework.TestResult; +import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; +import org.apache.catalina.tribes.ChannelInterceptor; public class TestNonBlockingCoordinator extends TestCase { GroupChannel[] channels = null; NonBlockingCoordinator[] coordinators = null; - int channelCount = 6; + int channelCount = 3; Thread[] threads = null; protected void setUp() throws Exception { + System.out.println("Setup"); super.setUp(); channels = new GroupChannel[channelCount]; coordinators = new NonBlockingCoordinator[channelCount]; @@ -39,32 +42,34 @@ } for ( int i=0; i<channelCount; i++ ) threads[i].start(); for ( int i=0; i<channelCount; i++ ) threads[i].join(); + Thread.sleep(10000); } public void testCoord1() throws Exception { for (int i=1; i<channelCount; i++ ) assertEquals("Message count expected to be equal.",channels[i-1].getMembers().length,channels[i].getMembers().length); - Member member = coordinators[0].getCoordinator(); int cnt = 0; while ( member == null && (cnt++ < 100 ) ) try {Thread.sleep(100); member = coordinators[0].getCoordinator();}catch ( Exception x){} for (int i=0; i<channelCount; i++ ) super.assertEquals(member,coordinators[i].getCoordinator()); System.out.println("Coordinator[1] is:"+member); + } - public void testCoord2() throws Exception { + public void stestCoord2() throws Exception { + Member member = coordinators[1].getCoordinator(); + System.out.println("Coordinator[2a] is:" + member); System.out.println("Shutting down:"+channels[0].getLocalMember(true).toString()); channels[0].stop(Channel.DEFAULT); Thread.sleep(1000); System.out.println("Member count:"+channels[1].getMembers().length); - Member member = coordinators[1].getCoordinator(); + member = coordinators[1].getCoordinator(); for (int i = 1; i < channelCount; i++)super.assertEquals(member, coordinators[i].getCoordinator()); - Thread.sleep(3000); - System.out.println("Coordinator[2] is:" + member); - + System.out.println("Coordinator[2b] is:" + member); } protected void tearDown() throws Exception { + System.out.println("tearDown"); super.tearDown(); for ( int i=0; i<channelCount; i++ ) { channels[i].stop(Channel.DEFAULT); @@ -76,6 +81,8 @@ suite.addTestSuite(TestNonBlockingCoordinator.class); suite.run(new TestResult()); } + + --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]