Author: fhanik Date: Mon Jul 3 06:24:13 2006 New Revision: 418764 URL: http://svn.apache.org/viewvc?rev=418764&view=rev Log: Fixes to test cases mostly, some minor changes in the code base
Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java Mon Jul 3 06:24:13 2006 @@ -4,6 +4,8 @@ import org.apache.catalina.tribes.Member; import java.util.ArrayList; import org.apache.catalina.tribes.group.AbsoluteOrder; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.Channel; public class StaticMembershipInterceptor extends ChannelInterceptorBase { @@ -73,6 +75,27 @@ public Member getLocalMember(boolean incAlive) { if (this.localMember != null ) return localMember; else return super.getLocalMember(incAlive); + } + + /** + * Send notifications upwards + * @param svc int + * @throws ChannelException + */ + public void start(int svc) throws ChannelException { + if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) super.start(Channel.SND_RX_SEQ); + if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) super.start(Channel.SND_TX_SEQ); + final Member[] mbrs = (Member[])members.toArray(new Member[members.size()]); + final ChannelInterceptorBase base = this; + Thread t = new Thread() { + public void run() { + for (int i=0; i<mbrs.length; i++ ) { + base.memberAdded(mbrs[i]); + } + } + }; + t.start(); + super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ)); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Mon Jul 3 06:24:13 2006 @@ -42,6 +42,7 @@ double mbAppTx = 0; double mbRx = 0; double timeTx = 0; + double lastCnt = 0; AtomicLong msgTxCnt = new AtomicLong(1); AtomicLong msgRxCnt = new AtomicLong(0); AtomicLong msgTxErr = new AtomicLong(0); @@ -67,16 +68,10 @@ if ( access.addAndGet(-1) == 0 ) { long stop = System.currentTimeMillis(); timeTx += ( (double) (stop - txStart)) / 1000d; - } - - if (msgTxCnt.get() % interval == 0) { - double time = timeTx; - - if ( access.get() != 0 ) { - long now = System.currentTimeMillis(); - time = (double)(now - txStart + timeTx)/1000d; + if ((msgTxCnt.get() / interval) >= lastCnt) { + lastCnt++; + report(timeTx); } - report(time); } msgTxCnt.addAndGet(1); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java Mon Jul 3 06:24:13 2006 @@ -78,8 +78,6 @@ public WorkerThread getWorker() { WorkerThread worker = null; - - synchronized (mutex) { while ( worker == null && running ) { if (idle.size() > 0) { @@ -113,7 +111,8 @@ if ( running ) { synchronized (mutex) { used.remove(worker); - if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); + //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); + if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit else { worker.setDoRun(false); synchronized (worker){worker.notify();} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Mon Jul 3 06:24:13 2006 @@ -363,8 +363,8 @@ if (worker == null) { // No threads available, do nothing, the selection // loop will keep calling this method until a - // thread becomes available. - // FIXME: This design could be improved. + // thread becomes available, the thread pool itself has a waiting mechanism + // so we will not wait here. if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available"); } else { Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Mon Jul 3 06:24:13 2006 @@ -33,6 +33,10 @@ import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor; +import java.util.ArrayList; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; +import org.apache.catalina.tribes.Member; /** * <p>Title: </p> @@ -65,6 +69,7 @@ .append("\n\t\t[-mfreq multicastfrequency]") .append("\n\t\t[-mdrop multicastdroptime]") .append("\n\t\t[-gzip]") + .append("\n\t\t[-static hostname:port (-static localhost:9999 -static 127.0.0.1:8888 can be repeated)]") .append("\n\t\t[-order]") .append("\n\t\t[-ordersize maxorderqueuesize]") .append("\n\t\t[-frag]") @@ -94,6 +99,7 @@ boolean frag = false; int fragsize = 1024; int autoBind = 10; + ArrayList staticMembers = new ArrayList(); Properties transportProperties = new Properties(); String transport = "org.apache.catalina.tribes.transport.nio.PooledParallelSender"; String receiver = "org.apache.catalina.tribes.transport.nio.NioReceiver"; @@ -122,6 +128,12 @@ } else if ("-asyncsize".equals(args[i])) { asyncsize = Integer.parseInt(args[++i]); System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize); + } else if ("-static".equals(args[i])) { + String d = args[++i]; + String h = d.substring(0,d.indexOf(":")); + String p = d.substring(h.length()+1); + MemberImpl m = new MemberImpl(h,Integer.parseInt(p),2000); + staticMembers.add(m); } else if ("-throughput".equals(args[i])) { throughput = true; } else if ("-order".equals(args[i])) { @@ -196,7 +208,7 @@ channel.setChannelReceiver(rx); channel.setChannelSender(ps); channel.setMembershipService(service); - + if ( throughput ) channel.addInterceptor(new ThroughputInterceptor()); if (gzip) channel.addInterceptor(new GzipInterceptor()); if ( frag ) { @@ -221,6 +233,15 @@ TcpFailureDetector tcpfi = new TcpFailureDetector(); channel.addInterceptor(tcpfi); } + if ( staticMembers.size() > 0 ) { + StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); + for (int x=0; x<staticMembers.size(); x++ ) { + smi.addStaticMember((Member)staticMembers.get(x)); + } + channel.addInterceptor(smi); + } + + byte[] domain = new byte[] {1,2,3,4,5,6,7,8,9,0}; ((McastService)channel.getMembershipService()).setDomain(domain); DomainFilterInterceptor filter = new DomainFilterInterceptor(); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java Mon Jul 3 06:24:13 2006 @@ -293,6 +293,7 @@ "java LoadTest [options]\n\t"+ "Options:\n\t\t"+ "[-mode receive|send|both] \n\t\t"+ + "[-startoptions startflags (default is Channel.DEFAULT) ] \n\t\t"+ "[-debug] \n\t\t"+ "[-count messagecount] \n\t\t"+ "[-stats statinterval] \n\t\t"+ @@ -319,6 +320,7 @@ boolean breakOnEx = false; int threads = 1; boolean shutdown = false; + int startoptions = Channel.DEFAULT; int channelOptions = Channel.SEND_OPTIONS_DEFAULT; if ( args.length == 0 ) { args = new String[] {"-help"}; @@ -341,6 +343,9 @@ } else if ("-sendoptions".equals(args[i])) { channelOptions = Integer.parseInt(args[++i]); System.out.println("Setting send options to "+channelOptions); + } else if ("-startoptions".equals(args[i])) { + startoptions = Integer.parseInt(args[++i]); + System.out.println("Setting start options to "+startoptions); } else if ("-size".equals(args[i])) { size = Integer.parseInt(args[++i])-4; System.out.println("Message size will be:"+(size+4)+" bytes"); @@ -355,7 +360,6 @@ } } - ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args); LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx); @@ -365,7 +369,7 @@ messageSize = LoadMessage.getMessageSize(msg); channel.addChannelListener(test); channel.addMembershipListener(test); - channel.start(channel.DEFAULT); + channel.start(startoptions); Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); while ( threads > 1 ) { Thread t = new Thread(test); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java Mon Jul 3 06:24:13 2006 @@ -19,6 +19,7 @@ Selector selector = Selector.open(); Member mbr = new MemberImpl("localhost", 9999, 0); ChannelData data = new ChannelData(); + data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE); data.setAddress(mbr); byte[] buf = new byte[8192 * 4]; data.setMessage(new XByteBuffer(buf,false)); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java Mon Jul 3 06:24:13 2006 @@ -22,6 +22,7 @@ while ( true ) { if ( first ) { first = false; start = System.currentTimeMillis();} int len = in.read(buf); + if ( len == -1 ) System.exit(1); mb += ( (double) len) / 1024 / 1024; if ( ((count++) % 10000) == 0 ) { long time = System.currentTimeMillis(); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java Mon Jul 3 06:24:13 2006 @@ -18,6 +18,7 @@ suite.addTestSuite(org.apache.catalina.tribes.test.membership.MemberSerialization.class); suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestMemberArrival.class); suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestTcpFailureDetector.class); + suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestDataIntegrity.class); return suite; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java Mon Jul 3 06:24:13 2006 @@ -22,7 +22,7 @@ * @version 1.0 */ public class TestDataIntegrity extends TestCase { - int msgCount = 1000; + int msgCount = 10000; GroupChannel channel1; GroupChannel channel2; Listener listener1; Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java?rev=418764&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java Mon Jul 3 06:24:13 2006 @@ -0,0 +1,116 @@ +package org.apache.catalina.tribes.test.channel; + +import junit.framework.TestCase; +import java.io.Serializable; +import java.util.Random; +import java.util.Arrays; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public class TestRemoteProcessException extends TestCase { + int msgCount = 10000; + GroupChannel channel1; + GroupChannel channel2; + Listener listener1; + protected void setUp() throws Exception { + super.setUp(); + channel1 = new GroupChannel(); + channel2 = new GroupChannel(); + listener1 = new Listener(); + channel2.addChannelListener(listener1); + channel1.start(GroupChannel.DEFAULT); + channel2.start(GroupChannel.DEFAULT); + } + + protected void tearDown() throws Exception { + super.tearDown(); + channel1.stop(GroupChannel.DEFAULT); + channel2.stop(GroupChannel.DEFAULT); + } + + public void testDataSendSYNCACK() throws Exception { + System.err.println("Starting SYNC_ACK"); + int errC=0, nerrC=0; + for (int i=0; i<msgCount; i++) { + boolean error = Data.r.nextBoolean(); + channel1.send(channel1.getMembers(),Data.createRandomData(error),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK); + if ( error ) errC++; else nerrC++; + } + System.err.println("Finished SYNC_ACK"); + assertEquals("Checking failure messages.",errC,listener1.errCnt); + assertEquals("Checking success messages.",nerrC,listener1.noErrCnt); + assertEquals("Checking all messages.",msgCount,listener1.noErrCnt+listener1.errCnt); + } + + public static class Listener implements ChannelListener { + long noErrCnt = 0; + long errCnt = 0; + public boolean accept(Serializable s, Member m) { + return (s instanceof Data); + } + + public void messageReceived(Serializable s, Member m) { + Data d = (Data)s; + if ( !Data.verify(d) ) { + System.err.println("ERROR"); + } else { + if (d.error) { + errCnt++; + if ( (errCnt % 100) == 0) { + System.err.println("NORMAL:" + noErrCnt); + System.err.println("FAILURES:" + errCnt); + System.err.println("TOTAL:" + errCnt+noErrCnt); + } + } else { + noErrCnt++; + if ( (noErrCnt % 100) == 0) { + System.err.println("NORMAL:" + noErrCnt); + System.err.println("FAILURES:" + errCnt); + System.err.println("TOTAL:" + errCnt+noErrCnt); + } + } + } + } + } + + public static class Data implements Serializable { + public int length; + public byte[] data; + public byte key; + public boolean error = false; + public static Random r = new Random(System.currentTimeMillis()); + public static Data createRandomData(boolean error) { + int i = r.nextInt(); + i = ( i % 127 ); + int length = Math.abs(r.nextInt() % 65555); + Data d = new Data(); + d.length = length; + d.key = (byte)i; + d.data = new byte[length]; + Arrays.fill(d.data,d.key); + return d; + } + + public static boolean verify(Data d) { + boolean result = (d.length == d.data.length); + for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; + return result; + } + } + + + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=418764&r1=418763&r2=418764&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Mon Jul 3 06:24:13 2006 @@ -41,6 +41,9 @@ Code Tasks: =========================================== +51. NioSender.setData should not expand the byte buffer if its too large + instead just refill it from the XByteBuffer + 50. On top of versioning, implement version syncs from primary to backup Or when a backup receives an update that is out of sync --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]