Author: fhanik Date: Fri Jan 9 14:38:52 2009 New Revision: 733180 URL: http://svn.apache.org/viewvc?rev=733180&view=rev Log: Implement the ability to broadcast a message using multicast and bypass all TCP, simple fire-and-forget behavior, yet no change in how messages are sent and received for the consumer
Added: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=733180&r1=733179&r2=733180&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Fri Jan 9 14:38:52 2009 @@ -191,6 +191,14 @@ public static final int SEND_OPTIONS_UDP = 0x0020; /** + * Send options. When a message is sent with this flag on + * the system sends a UDP message on the Multicast address instead of UDP or TCP to individual addresses + * @see #send(Member[], Serializable , int) + * @see #send(Member[], Serializable, int, ErrorHandler) + */ + public static final int SEND_OPTIONS_MULTICAST = 0x0040; + + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack. <br> Modified: tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java?rev=733180&r1=733179&r2=733180&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java Fri Jan 9 14:38:52 2009 @@ -131,5 +131,12 @@ public void setPayload(byte[] payload); public void setDomain(byte[] domain); + + /** + * Broadcasts a message to all members + * @param message + * @throws ChannelException + */ + public void broadcast(ChannelMessage message) throws ChannelException; } Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=733180&r1=733179&r2=733180&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java Fri Jan 9 14:38:52 2009 @@ -75,7 +75,11 @@ */ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( destination == null ) destination = membershipService.getMembers(); - clusterSender.sendMessage(msg,destination); + if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { + membershipService.broadcast(msg); + } else { + clusterSender.sendMessage(msg,destination); + } if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination)); } @@ -154,6 +158,9 @@ if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) { membershipService.setMembershipListener(this); + if (membershipService instanceof McastService) { + ((McastService)membershipService).setMessageListener(this); + } membershipService.start(MembershipService.MBR_RX); valid = true; } @@ -244,7 +251,6 @@ super.messageReceived(msg); } - public ChannelReceiver getClusterReceiver() { return clusterReceiver; } Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java?rev=733180&r1=733179&r2=733180&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java Fri Jan 9 14:38:52 2009 @@ -17,14 +17,21 @@ package org.apache.catalina.tribes.membership; +import java.io.IOException; +import java.net.DatagramPacket; import java.util.Properties; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.MembershipService; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.util.StringManager; import org.apache.catalina.tribes.util.UUIDGenerator; -import java.io.IOException; /** * A <b>membership</b> implementation using simple multicast. @@ -37,7 +44,7 @@ */ -public class McastService implements MembershipService,MembershipListener { +public class McastService implements MembershipService,MembershipListener,MessageListener { private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( McastService.class ); @@ -65,6 +72,10 @@ */ protected MembershipListener listener; /** + * A message listener delegate for broadcasts + */ + protected MessageListener msglistener; + /** * The local member */ protected MemberImpl localMember ; @@ -371,6 +382,7 @@ java.net.InetAddress.getByName(properties.getProperty("mcastAddress")), ttl, soTimeout, + this, this); String value = properties.getProperty("recoveryEnabled","true"); boolean recEnabled = Boolean.valueOf(value).booleanValue() ; @@ -456,6 +468,14 @@ public void setMembershipListener(MembershipListener listener) { this.listener = listener; } + + public void setMessageListener(MessageListener listener) { + this.msglistener = listener; + } + + public void removeMessageListener() { + this.msglistener = null; + } /** * Remove the membership listener */ @@ -475,6 +495,27 @@ { if ( listener!=null ) listener.memberDisappeared(member); } + + public void messageReceived(ChannelMessage msg) { + if (msglistener!=null && msglistener.accept(msg)) msglistener.messageReceived(msg); + } + + public boolean accept(ChannelMessage msg) { + return true; + } + + public void broadcast(ChannelMessage message) throws ChannelException { + if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) + throw new ChannelException("Multicast send is not started or enabled."); + + byte[] data = XByteBuffer.createDataPackage((ChannelData)message); + DatagramPacket packet = new DatagramPacket(data,0,data.length); + try { + impl.send(false, packet); + } catch (Exception x) { + throw new ChannelException(x); + } + } /** * @deprecated use getSoTimeout Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=733180&r1=733179&r2=733180&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Fri Jan 9 14:38:52 2009 @@ -19,17 +19,24 @@ import java.io.IOException; +import java.net.BindException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; import java.net.SocketTimeoutException; import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; -import java.net.BindException; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.XByteBuffer; /** * A <b>membership</b> implementation using simple multicast. @@ -91,10 +98,14 @@ */ protected Membership membership; /** - * The actual listener, for callback when shits goes down + * The actual listener, for callback when stuff goes down */ protected MembershipListener service; /** + * The actual listener for broadcast callbacks + */ + protected MessageListener msgservice; + /** * Thread to listen for pings */ protected ReceiverThread receiver; @@ -135,6 +146,12 @@ * Add the ability to turn on/off recovery */ protected boolean recoveryEnabled = true; + + /** + * Dont interrupt the sender/receiver thread, but pass off to an executor + */ + protected ExecutorService executor = new ThreadPoolExecutor(0, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + /** * Create a new mcast service impl * @param member - the local member @@ -155,7 +172,8 @@ InetAddress mcastAddress, int ttl, int soTimeout, - MembershipListener service) + MembershipListener service, + MessageListener msgservice) throws IOException { this.member = member; this.address = mcastAddress; @@ -165,6 +183,7 @@ this.mcastBindAddress = bind; this.timeToExpiration = expireTime; this.service = service; + this.msgservice = msgservice; this.sendFrequency = sendFrequency; init(); } @@ -315,46 +334,104 @@ * @throws IOException */ public void receive() throws IOException { + boolean checkexpired = true; try { + socket.receive(receivePacket); if(receivePacket.getLength() > MAX_PACKET_SIZE) { log.error("Multicast packet received was too long, dropping package:"+receivePacket.getLength()); } else { byte[] data = new byte[receivePacket.getLength()]; System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length); - final MemberImpl m = MemberImpl.getMember(data); - if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m); - Thread t = null; - if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) { - if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m); - membership.removeMember(m); - t = new Thread() { - public void run() { - service.memberDisappeared(m); - } - }; - } else if (membership.memberAlive(m)) { - if (log.isDebugEnabled()) log.debug("Mcast add member " + m); - t = new Thread() { - public void run() { - service.memberAdded(m); + if (XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) { + memberDataReceived(data); + } else { + XByteBuffer buffer = new XByteBuffer(data,true); + if (buffer.countPackages(true)>0) { + int count = buffer.countPackages(); + ChannelData[] pkgs = new ChannelData[count]; + for (int i=0; i<count; i++) { + try { + pkgs[i] = buffer.extractPackage(true); + }catch (IllegalStateException ise) { + log.debug("Unable to decode message.",ise); + } } - }; - } //end if - if ( t != null ) { - t.setDaemon(true); - t.start(); + memberBroadcastsReceived(pkgs); + } } + } } catch (SocketTimeoutException x ) { //do nothing, this is normal, we don't want to block forever //since the receive thread is the same thread //that does membership expiration } - checkExpired(); + if (checkexpired) checkExpired(); + } + + private void memberDataReceived(byte[] data) { + final MemberImpl m = MemberImpl.getMember(data); + if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m); + Runnable t = null; + if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) { + if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m); + membership.removeMember(m); + t = new Runnable() { + public void run() { + String name = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("Membership-MemberDisappeared."); + service.memberDisappeared(m); + }finally { + Thread.currentThread().setName(name); + } + } + }; + } else if (membership.memberAlive(m)) { + if (log.isDebugEnabled()) log.debug("Mcast add member " + m); + t = new Runnable() { + public void run() { + String name = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("Membership-MemberAdded."); + service.memberAdded(m); + }finally { + Thread.currentThread().setName(name); + } + } + }; + } //end if + if ( t != null ) { + executor.execute(t); + } } - protected Object expiredMutex = new Object(); + private void memberBroadcastsReceived(final ChannelData[] data) { + if (log.isTraceEnabled()) log.trace("Mcast received broadcasts."); + Runnable t = new Runnable() { + public void run() { + String name = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("Membership-MemberAdded."); + for (int i=0; i<data.length; i++ ) { + try { + if (data[i]!=null) { + msgservice.messageReceived(data[i]); + } + }catch (Throwable t) { + log.error("Unable to receive broadcast message.",t); + } + } + }finally { + Thread.currentThread().setName(name); + } + } + }; + executor.execute(t); + } + + protected final Object expiredMutex = new Object(); protected void checkExpired() { synchronized (expiredMutex) { MemberImpl[] expired = membership.expire(timeToExpiration); @@ -363,12 +440,19 @@ if (log.isDebugEnabled()) log.debug("Mcast exipre member " + expired[i]); try { - Thread t = new Thread() { + Runnable t = new Runnable() { public void run() { - service.memberDisappeared(member); + String name = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("Membership-MemberExpired."); + service.memberDisappeared(member); + }finally { + Thread.currentThread().setName(name); + } + } }; - t.start(); + executor.execute(t); } catch (Exception x) { log.error("Unable to process member disappeared message.", x); } @@ -381,16 +465,30 @@ * @throws Exception */ public void send(boolean checkexpired) throws IOException{ + send(checkexpired,null); + } + + private final Object sendLock = new Object(); + public void send(boolean checkexpired, DatagramPacket packet) throws IOException{ + checkexpired = (checkexpired && (packet==null)); //ignore if we haven't started the sender //if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return; - member.inc(); - if(log.isTraceEnabled()) - log.trace("Mcast send ping from member " + member); - byte[] data = member.getData(); - DatagramPacket p = new DatagramPacket(data,data.length); - p.setAddress(address); - p.setPort(port); - socket.send(p); + if (packet==null) { + member.inc(); + if(log.isTraceEnabled()) { + log.trace("Mcast send ping from member " + member); + } + byte[] data = member.getData(); + packet = new DatagramPacket(data,data.length); + } else if (log.isTraceEnabled()) { + log.trace("Sending message broadcast "+packet.getLength()+ " bytes from "+ member); + } + packet.setAddress(address); + packet.setPort(port); + //TODO this operation is not thread safe + synchronized (sendLock) { + socket.send(packet); + } if ( checkexpired ) checkExpired(); } Added: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java?rev=733180&view=auto ============================================================================== --- tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java (added) +++ tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java Fri Jan 9 14:38:52 2009 @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.test.channel; + +import junit.framework.TestCase; +import java.io.Serializable; +import java.util.Random; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.transport.AbstractSender; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.catalina.tribes.transport.ReplicationTransmitter; +import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; +import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; +import org.apache.catalina.tribes.io.XByteBuffer; + +/** + */ +public class TestMulticastPackages extends TestCase { + int msgCount = 500; + int threadCount = 20; + GroupChannel channel1; + GroupChannel channel2; + Listener listener1; + int threadCounter = 0; + protected void setUp() throws Exception { + super.setUp(); + channel1 = new GroupChannel(); + channel1.addInterceptor(new MessageDispatch15Interceptor()); + channel2 = new GroupChannel(); + channel2.addInterceptor(new MessageDispatch15Interceptor()); + ThroughputInterceptor tint = new ThroughputInterceptor(); + tint.setInterval(500); + ThroughputInterceptor tint2 = new ThroughputInterceptor(); + tint2.setInterval(500); + //channel1.addInterceptor(tint); + channel2.addInterceptor(tint2); + listener1 = new Listener(); + ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver(); + ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver(); + rb1.setUdpPort(50000); + rb2.setUdpPort(50000); + channel2.addChannelListener(listener1); + channel1.start(GroupChannel.DEFAULT); + channel2.start(GroupChannel.DEFAULT); + } + + protected void tearDown() throws Exception { + super.tearDown(); + channel1.stop(GroupChannel.DEFAULT); + channel2.stop(GroupChannel.DEFAULT); + } + + public void testSingleDataSendNO_ACK() throws Exception { + AbstractSender s1 =(AbstractSender) ((ReplicationTransmitter)channel1.getChannelSender()).getTransport(); + AbstractSender s2 =(AbstractSender) ((ReplicationTransmitter)channel2.getChannelSender()).getTransport(); + s1.setTimeout(Long.MAX_VALUE); //for debugging + s2.setTimeout(Long.MAX_VALUE); //for debugging + + System.err.println("Starting Single package NO_ACK"); + channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_MULTICAST); + Thread.sleep(500); + System.err.println("Finished Single package NO_ACK ["+listener1.count+"]"); + assertEquals("Checking success messages.",1,listener1.count.get()); + } + + + public static void printMissingMsgs(int[] msgs, int maxIdx) { + for (int i=0; i<maxIdx && i<msgs.length; i++) { + if (msgs[i]==0) System.out.print(i+", "); + } + System.out.println(); + } + + public void testDataSendASYNCM() throws Exception { + final AtomicInteger counter = new AtomicInteger(0); + ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver(); + ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver(); + rb1.setUdpRxBufSize(1024*1024*10); + rb2.setUdpRxBufSize(1024*1024*10); + rb1.setUdpTxBufSize(1024*1024*10); + rb2.setUdpTxBufSize(1024*1024*10); + System.err.println("Starting NO_ACK"); + Thread[] threads = new Thread[threadCount]; + for (int x=0; x<threads.length; x++ ) { + threads[x] = new Thread() { + public void run() { + try { + long start = System.currentTimeMillis(); + for (int i = 0; i < msgCount; i++) { + int cnt = counter.getAndAdd(1); + channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_MULTICAST|Channel.SEND_OPTIONS_ASYNCHRONOUS); + //Thread.currentThread().sleep(10); + } + System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms."); + }catch ( Exception x ) { + x.printStackTrace(); + return; + } finally { + threadCounter++; + } + } + }; + } + for (int x=0; x<threads.length; x++ ) { threads[x].start();} + for (int x=0; x<threads.length; x++ ) { threads[x].join();} + //sleep for 50 sec, let the other messages in + long start = System.currentTimeMillis(); + while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500); + System.err.println("Finished NO_ACK ["+listener1.count+"]"); + System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx); + System.out.print("Missing messages:"); + printMissingMsgs(listener1.nrs,counter.get()); + assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get()); + } + public void testDataSendASYNC() throws Exception { + System.err.println("Starting ASYNC"); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_MULTICAST); + //sleep for 50 sec, let the other messages in + long start = System.currentTimeMillis(); + while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500); + System.err.println("Finished ASYNC"); + assertEquals("Checking success messages.",msgCount,listener1.count.get()); + } + + public void testDataSendACK() throws Exception { + System.err.println("Starting ACK"); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_MULTICAST); + Thread.sleep(250); + System.err.println("Finished ACK"); + assertEquals("Checking success messages.",msgCount,listener1.count.get()); + } + + public void testDataSendSYNCACK() throws Exception { + System.err.println("Starting SYNC_ACK"); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_MULTICAST); + Thread.sleep(250); + System.err.println("Finished SYNC_ACK"); + assertEquals("Checking success messages.",msgCount,listener1.count.get()); + } + + public static class Listener implements ChannelListener { + AtomicLong count = new AtomicLong(0); + int maxIdx = -1; + int[] nrs = new int[1000000]; + public Listener() { + Arrays.fill(nrs, 0); + } + public boolean accept(Serializable s, Member m) { + return (s instanceof Data); + } + + public void messageReceived(Serializable s, Member m) { + try { + Data d = (Data)s; + if ( !Data.verify(d) ) { + System.err.println("ERROR - Unable to verify data package"); + } else { + long c = count.addAndGet(1); + if ((c%1000) ==0 ) { + System.err.println("SUCCESS:"+c); + } + int nr = d.getNumber(); + if (nr>=0 && nr<nrs.length) { + maxIdx = Math.max(maxIdx, nr); + nrs[nr] = 1; + } + } + }catch (Exception x ) { + x.printStackTrace(); + } + } + } + + public static class Data implements Serializable { + public int length; + public byte[] data; + public byte key; + public boolean hasNr = false; + public static Random r = new Random(System.currentTimeMillis()); + public static Data createRandomData() { + return createRandomData(ChannelReceiver.MAX_UDP_SIZE); + } + public static Data createRandomData(int size) { + return createRandomData(size,-1); + } + + public static Data createRandomData(int size, int number) { + int i = r.nextInt(); + i = ( i % 127 ); + int length = Math.abs(r.nextInt() % size); + if (length<100) length += 100; + Data d = new Data(); + d.length = length; + d.key = (byte)i; + d.data = new byte[length]; + Arrays.fill(d.data,d.key); + if (number>0 && d.data.length>=4) { + //populate number + d.hasNr = true; + XByteBuffer.toBytes(number,d.data, 0); + } + return d; + } + + public int getNumber() { + if (!hasNr) return -1; + return XByteBuffer.toInt(this.data, 0); + } + + public static boolean verify(Data d) { + boolean result = (d.length == d.data.length); + for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; + return result; + } + } + + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org