Author: fhanik
Date: Fri Jan  9 14:38:52 2009
New Revision: 733180

URL: http://svn.apache.org/viewvc?rev=733180&view=rev
Log:
Implement the ability to broadcast a message using multicast and bypass all 
TCP, simple fire-and-forget behavior, yet no change in how messages are sent 
and received for the consumer

Added:
    
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java
Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
    tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java
    tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java
    tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java

Modified: tomcat/trunk/java/org/apache/catalina/tribes/Channel.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Channel.java?rev=733180&r1=733179&r2=733180&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/Channel.java Fri Jan  9 
14:38:52 2009
@@ -191,6 +191,14 @@
     public static final int SEND_OPTIONS_UDP =  0x0020;
 
     /**
+     * Send options. When a message is sent with this flag on
+     * the system sends a UDP message on the Multicast address instead of UDP 
or TCP to individual addresses
+     * @see #send(Member[], Serializable , int)
+     * @see #send(Member[], Serializable, int, ErrorHandler)
+     */
+    public static final int SEND_OPTIONS_MULTICAST =  0x0040;
+
+    /**
      * Send options, when a message is sent, it can have an option flag
      * to trigger certain behavior. Most flags are used to trigger channel 
interceptors
      * as the message passes through the channel stack. <br>

Modified: tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java?rev=733180&r1=733179&r2=733180&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/MembershipService.java Fri Jan 
 9 14:38:52 2009
@@ -131,5 +131,12 @@
     public void setPayload(byte[] payload);
     
     public void setDomain(byte[] domain);
+    
+    /**
+     * Broadcasts a message to all members
+     * @param message
+     * @throws ChannelException
+     */
+    public void broadcast(ChannelMessage message) throws ChannelException;
 
 }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=733180&r1=733179&r2=733180&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/group/ChannelCoordinator.java 
Fri Jan  9 14:38:52 2009
@@ -75,7 +75,11 @@
      */
     public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
         if ( destination == null ) destination = 
membershipService.getMembers();
-        clusterSender.sendMessage(msg,destination);
+        if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) == 
Channel.SEND_OPTIONS_MULTICAST) {
+            membershipService.broadcast(msg);
+        } else {
+            clusterSender.sendMessage(msg,destination);
+        }
         if ( Logs.MESSAGES.isTraceEnabled() ) {
             Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new 
UniqueId(msg.getUniqueId()) + " at " +new 
java.sql.Timestamp(System.currentTimeMillis())+ " to 
"+Arrays.toNameString(destination));
         }
@@ -154,6 +158,9 @@
             
             if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
                 membershipService.setMembershipListener(this);
+                if (membershipService instanceof McastService) {
+                    ((McastService)membershipService).setMessageListener(this);
+                }
                 membershipService.start(MembershipService.MBR_RX);
                 valid = true;
             }
@@ -244,7 +251,6 @@
         super.messageReceived(msg);
     }
 
-
     public ChannelReceiver getClusterReceiver() {
         return clusterReceiver;
     }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java?rev=733180&r1=733179&r2=733180&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java 
Fri Jan  9 14:38:52 2009
@@ -17,14 +17,21 @@
 
 package org.apache.catalina.tribes.membership;
 
+import java.io.IOException;
+import java.net.DatagramPacket;
 import java.util.Properties;
 
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
 import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.util.StringManager;
 import org.apache.catalina.tribes.util.UUIDGenerator;
-import java.io.IOException;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -37,7 +44,7 @@
  */
 
 
-public class McastService implements MembershipService,MembershipListener {
+public class McastService implements 
MembershipService,MembershipListener,MessageListener {
 
     private static org.apache.juli.logging.Log log =
         org.apache.juli.logging.LogFactory.getLog( McastService.class );
@@ -65,6 +72,10 @@
      */
     protected MembershipListener listener;
     /**
+     * A message listener delegate for broadcasts
+     */
+    protected MessageListener msglistener;
+    /**
      * The local member
      */
     protected MemberImpl localMember ;
@@ -371,6 +382,7 @@
                                     
java.net.InetAddress.getByName(properties.getProperty("mcastAddress")),
                                     ttl,
                                     soTimeout,
+                                    this,
                                     this);
         String value = properties.getProperty("recoveryEnabled","true");
         boolean recEnabled = Boolean.valueOf(value).booleanValue() ;
@@ -456,6 +468,14 @@
     public void setMembershipListener(MembershipListener listener) {
         this.listener = listener;
     }
+    
+    public void setMessageListener(MessageListener listener) {
+        this.msglistener = listener;
+    }
+    
+    public void removeMessageListener() {
+        this.msglistener = null;
+    }
     /**
      * Remove the membership listener
      */
@@ -475,6 +495,27 @@
     {
         if ( listener!=null ) listener.memberDisappeared(member);
     }
+    
+    public void messageReceived(ChannelMessage msg) {
+        if (msglistener!=null && msglistener.accept(msg)) 
msglistener.messageReceived(msg); 
+    }
+    
+    public boolean accept(ChannelMessage msg) {
+        return true;
+    }
+    
+    public void broadcast(ChannelMessage message) throws ChannelException {
+        if (impl==null || (impl.startLevel & 
Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ )
+            throw new ChannelException("Multicast send is not started or 
enabled.");
+        
+        byte[] data = XByteBuffer.createDataPackage((ChannelData)message);
+        DatagramPacket packet = new DatagramPacket(data,0,data.length);
+        try {
+            impl.send(false, packet);
+        } catch (Exception x) {
+            throw new ChannelException(x);
+        }
+    }
 
     /**
      * @deprecated use getSoTimeout

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=733180&r1=733179&r2=733180&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
(original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
Fri Jan  9 14:38:52 2009
@@ -19,17 +19,24 @@
 
 
 import java.io.IOException;
+import java.net.BindException;
 import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.MulticastSocket;
 import java.net.SocketTimeoutException;
 import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
-import java.net.BindException;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -91,10 +98,14 @@
      */
     protected Membership membership;
     /**
-     * The actual listener, for callback when shits goes down
+     * The actual listener, for callback when stuff goes down
      */
     protected MembershipListener service;
     /**
+     * The actual listener for broadcast callbacks
+     */
+    protected MessageListener msgservice;
+    /**
      * Thread to listen for pings
      */
     protected ReceiverThread receiver;
@@ -135,6 +146,12 @@
      * Add the ability to turn on/off recovery
      */
     protected boolean recoveryEnabled = true;
+    
+    /**
+     * Dont interrupt the sender/receiver thread, but pass off to an executor
+     */
+    protected ExecutorService executor = new ThreadPoolExecutor(0, 2, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    
     /**
      * Create a new mcast service impl
      * @param member - the local member
@@ -155,7 +172,8 @@
         InetAddress mcastAddress,
         int ttl,
         int soTimeout,
-        MembershipListener service)
+        MembershipListener service,
+        MessageListener msgservice)
     throws IOException {
         this.member = member;
         this.address = mcastAddress;
@@ -165,6 +183,7 @@
         this.mcastBindAddress = bind;
         this.timeToExpiration = expireTime;
         this.service = service;
+        this.msgservice = msgservice;
         this.sendFrequency = sendFrequency;
         init();
     }
@@ -315,46 +334,104 @@
      * @throws IOException
      */
     public void receive() throws IOException {
+        boolean checkexpired = true;
         try {
+            
             socket.receive(receivePacket);
             if(receivePacket.getLength() > MAX_PACKET_SIZE) {
                 log.error("Multicast packet received was too long, dropping 
package:"+receivePacket.getLength());
             } else {
                 byte[] data = new byte[receivePacket.getLength()];
                 System.arraycopy(receivePacket.getData(), 
receivePacket.getOffset(), data, 0, data.length);
-                final MemberImpl m = MemberImpl.getMember(data);
-                if (log.isTraceEnabled()) log.trace("Mcast receive ping from 
member " + m);
-                Thread t = null;
-                if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
-                    if (log.isDebugEnabled()) log.debug("Member has shutdown:" 
+ m);
-                    membership.removeMember(m);
-                    t = new Thread() {
-                        public void run() {
-                            service.memberDisappeared(m);
-                        }
-                    };
-                } else if (membership.memberAlive(m)) {
-                    if (log.isDebugEnabled()) log.debug("Mcast add member " + 
m);
-                    t = new Thread() {
-                        public void run() {
-                            service.memberAdded(m);
+                if 
(XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) {
+                    memberDataReceived(data);
+                } else {
+                    XByteBuffer buffer = new XByteBuffer(data,true);
+                    if (buffer.countPackages(true)>0) {
+                        int count = buffer.countPackages();
+                        ChannelData[] pkgs = new ChannelData[count];
+                        for (int i=0; i<count; i++) {
+                            try {
+                                pkgs[i] = buffer.extractPackage(true);
+                            }catch (IllegalStateException ise) {
+                                log.debug("Unable to decode message.",ise);
+                            }
                         }
-                    };
-                } //end if
-                if ( t != null ) {
-                    t.setDaemon(true);
-                    t.start();
+                        memberBroadcastsReceived(pkgs);
+                    }
                 }
+                
             }
         } catch (SocketTimeoutException x ) { 
             //do nothing, this is normal, we don't want to block forever
             //since the receive thread is the same thread
             //that does membership expiration
         }
-        checkExpired();
+        if (checkexpired) checkExpired();
+    }
+
+    private void memberDataReceived(byte[] data) {
+        final MemberImpl m = MemberImpl.getMember(data);
+        if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " 
+ m);
+        Runnable t = null;
+        if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
+            if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
+            membership.removeMember(m);
+            t = new Runnable() {
+                public void run() {
+                    String name = Thread.currentThread().getName();
+                    try {
+                        
Thread.currentThread().setName("Membership-MemberDisappeared.");
+                        service.memberDisappeared(m);
+                    }finally {
+                        Thread.currentThread().setName(name);
+                    }
+                }
+            };
+        } else if (membership.memberAlive(m)) {
+            if (log.isDebugEnabled()) log.debug("Mcast add member " + m);
+            t = new Runnable() {
+                public void run() {
+                    String name = Thread.currentThread().getName();
+                    try {
+                        
Thread.currentThread().setName("Membership-MemberAdded.");
+                        service.memberAdded(m);
+                    }finally {
+                        Thread.currentThread().setName(name);
+                    }
+                }
+            };
+        } //end if
+        if ( t != null ) {
+            executor.execute(t);
+        }
     }
     
-    protected Object expiredMutex = new Object();
+    private void memberBroadcastsReceived(final ChannelData[] data) {
+        if (log.isTraceEnabled()) log.trace("Mcast received broadcasts.");
+        Runnable t = new Runnable() {
+            public void run() {
+                String name = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName("Membership-MemberAdded.");
+                    for (int i=0; i<data.length; i++ ) {
+                        try {
+                            if (data[i]!=null) {
+                                msgservice.messageReceived(data[i]);
+                            }
+                        }catch (Throwable t) {
+                            log.error("Unable to receive broadcast 
message.",t);
+                        }
+                    }
+                }finally {
+                    Thread.currentThread().setName(name);
+                }
+            }
+        };
+        executor.execute(t);
+    }
+
+    protected final Object expiredMutex = new Object();
     protected void checkExpired() {
         synchronized (expiredMutex) {
             MemberImpl[] expired = membership.expire(timeToExpiration);
@@ -363,12 +440,19 @@
                 if (log.isDebugEnabled())
                     log.debug("Mcast exipre  member " + expired[i]);
                 try {
-                    Thread t = new Thread() {
+                    Runnable t = new Runnable() {
                         public void run() {
-                            service.memberDisappeared(member);
+                            String name = Thread.currentThread().getName();
+                            try {
+                                
Thread.currentThread().setName("Membership-MemberExpired.");
+                                service.memberDisappeared(member);
+                            }finally {
+                                Thread.currentThread().setName(name);
+                            }
+                            
                         }
                     };
-                    t.start();
+                    executor.execute(t);
                 } catch (Exception x) {
                     log.error("Unable to process member disappeared message.", 
x);
                 }
@@ -381,16 +465,30 @@
      * @throws Exception
      */ 
     public void send(boolean checkexpired) throws IOException{
+        send(checkexpired,null);
+    }
+    
+    private final Object sendLock = new Object();
+    public void send(boolean checkexpired, DatagramPacket packet) throws 
IOException{
+        checkexpired = (checkexpired && (packet==null));
         //ignore if we haven't started the sender
         //if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
-        member.inc();
-        if(log.isTraceEnabled())
-            log.trace("Mcast send ping from member " + member);
-        byte[] data = member.getData();
-        DatagramPacket p = new DatagramPacket(data,data.length);
-        p.setAddress(address);
-        p.setPort(port);
-        socket.send(p);
+        if (packet==null) {
+            member.inc();
+            if(log.isTraceEnabled()) {
+                log.trace("Mcast send ping from member " + member);
+            }
+            byte[] data = member.getData();
+            packet = new DatagramPacket(data,data.length);
+        } else if (log.isTraceEnabled()) {
+            log.trace("Sending message broadcast "+packet.getLength()+ " bytes 
from "+ member);
+        }
+        packet.setAddress(address);
+        packet.setPort(port);
+        //TODO this operation is not thread safe
+        synchronized (sendLock) {
+            socket.send(packet);
+        }
         if ( checkexpired ) checkExpired();
     }
 

Added: 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java?rev=733180&view=auto
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java
 (added)
+++ 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestMulticastPackages.java
 Fri Jan  9 14:38:52 2009
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.transport.AbstractSender;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import 
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+import org.apache.catalina.tribes.io.XByteBuffer;
+
+/**
+ */
+public class TestMulticastPackages extends TestCase {
+    int msgCount = 500;
+    int threadCount = 20;
+    GroupChannel channel1;
+    GroupChannel channel2;
+    Listener listener1;
+    int threadCounter = 0;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channel1 = new GroupChannel();
+        channel1.addInterceptor(new MessageDispatch15Interceptor());
+        channel2 = new GroupChannel();
+        channel2.addInterceptor(new MessageDispatch15Interceptor());
+        ThroughputInterceptor tint = new ThroughputInterceptor();
+        tint.setInterval(500);
+        ThroughputInterceptor tint2 = new ThroughputInterceptor();
+        tint2.setInterval(500);
+        //channel1.addInterceptor(tint);
+        channel2.addInterceptor(tint2);
+        listener1 = new Listener();
+        ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+        ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+        rb1.setUdpPort(50000);
+        rb2.setUdpPort(50000);
+        channel2.addChannelListener(listener1);
+        channel1.start(GroupChannel.DEFAULT);
+        channel2.start(GroupChannel.DEFAULT);
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        channel1.stop(GroupChannel.DEFAULT);
+        channel2.stop(GroupChannel.DEFAULT);
+    }
+
+    public void testSingleDataSendNO_ACK() throws Exception {
+        AbstractSender s1 =(AbstractSender) 
((ReplicationTransmitter)channel1.getChannelSender()).getTransport();
+        AbstractSender s2 =(AbstractSender) 
((ReplicationTransmitter)channel2.getChannelSender()).getTransport();
+        s1.setTimeout(Long.MAX_VALUE); //for debugging
+        s2.setTimeout(Long.MAX_VALUE); //for debugging
+        
+        System.err.println("Starting Single package NO_ACK");
+        channel1.send(new Member[] {channel2.getLocalMember(false)}, 
Data.createRandomData(1024),Channel.SEND_OPTIONS_MULTICAST);
+        Thread.sleep(500);
+        System.err.println("Finished Single package NO_ACK 
["+listener1.count+"]");
+        assertEquals("Checking success messages.",1,listener1.count.get());
+    }
+
+    
+    public static void printMissingMsgs(int[] msgs, int maxIdx) {
+        for (int i=0; i<maxIdx && i<msgs.length; i++) {
+            if (msgs[i]==0) System.out.print(i+", ");
+        }
+        System.out.println();
+    }
+
+    public void testDataSendASYNCM() throws Exception {
+        final AtomicInteger counter = new AtomicInteger(0);
+        ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+        ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+        rb1.setUdpRxBufSize(1024*1024*10);
+        rb2.setUdpRxBufSize(1024*1024*10);
+        rb1.setUdpTxBufSize(1024*1024*10);
+        rb2.setUdpTxBufSize(1024*1024*10);
+        System.err.println("Starting NO_ACK");
+        Thread[] threads = new Thread[threadCount];
+        for (int x=0; x<threads.length; x++ ) {
+            threads[x] = new Thread() {
+                public void run() {
+                    try {
+                        long start = System.currentTimeMillis();
+                        for (int i = 0; i < msgCount; i++) {
+                            int cnt = counter.getAndAdd(1);
+                            channel1.send(new Member[] 
{channel2.getLocalMember(false)}, 
Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_MULTICAST|Channel.SEND_OPTIONS_ASYNCHRONOUS);
+                            //Thread.currentThread().sleep(10);
+                        }
+                        System.out.println("Thread["+this.getName()+"] sent 
"+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+                    }catch ( Exception x ) {
+                        x.printStackTrace();
+                        return;
+                    } finally {
+                        threadCounter++;
+                    }
+                }
+            };
+        } 
+        for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+        for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+        //sleep for 50 sec, let the other messages in
+        long start = System.currentTimeMillis();
+        while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
+        System.err.println("Finished NO_ACK ["+listener1.count+"]");
+        System.out.println("Sent "+counter.get()+ " messages. Received 
"+listener1.count+" Highest msg received:"+listener1.maxIdx);
+        System.out.print("Missing messages:");
+        printMissingMsgs(listener1.nrs,counter.get());
+        assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count.get());
+    }
+    public void testDataSendASYNC() throws Exception {
+        System.err.println("Starting ASYNC");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_MULTICAST);
+        //sleep for 50 sec, let the other messages in
+        long start = System.currentTimeMillis();
+        while ( (System.currentTimeMillis()-start)<5000 && 
msgCount!=listener1.count.get()) Thread.sleep(500);
+        System.err.println("Finished ASYNC");
+        assertEquals("Checking success 
messages.",msgCount,listener1.count.get());
+    }
+
+    public void testDataSendACK() throws Exception {
+        System.err.println("Starting ACK");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_MULTICAST);
+        Thread.sleep(250);
+        System.err.println("Finished ACK");
+        assertEquals("Checking success 
messages.",msgCount,listener1.count.get());
+    }
+
+    public void testDataSendSYNCACK() throws Exception {
+        System.err.println("Starting SYNC_ACK");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_MULTICAST);
+        Thread.sleep(250);
+        System.err.println("Finished SYNC_ACK");
+        assertEquals("Checking success 
messages.",msgCount,listener1.count.get());
+    }
+
+    public static class Listener implements ChannelListener {
+        AtomicLong count = new AtomicLong(0);
+        int maxIdx = -1;
+        int[] nrs = new int[1000000];
+        public Listener() {
+            Arrays.fill(nrs, 0);
+        }
+        public boolean accept(Serializable s, Member m) {
+            return (s instanceof Data);
+        }
+
+        public void messageReceived(Serializable s, Member m) {
+            try {
+                Data d = (Data)s;
+                if ( !Data.verify(d) ) {
+                    System.err.println("ERROR - Unable to verify data 
package");
+                } else {
+                    long c = count.addAndGet(1);
+                    if ((c%1000) ==0 ) {
+                        System.err.println("SUCCESS:"+c);
+                    }
+                    int nr = d.getNumber();
+                    if (nr>=0 && nr<nrs.length) {
+                        maxIdx = Math.max(maxIdx, nr);
+                        nrs[nr] = 1;
+                    }
+                }
+            }catch (Exception x ) {
+                x.printStackTrace();
+            }
+        }
+    }
+
+    public static class Data implements Serializable {
+        public int length;
+        public byte[] data;
+        public byte key;
+        public boolean hasNr = false;
+        public static Random r = new Random(System.currentTimeMillis());
+        public static Data createRandomData() {
+            return createRandomData(ChannelReceiver.MAX_UDP_SIZE);
+        }
+        public static Data createRandomData(int size) {
+            return createRandomData(size,-1);
+        }
+        
+        public static Data createRandomData(int size, int number) {
+            int i = r.nextInt();
+            i = ( i % 127 );
+            int length = Math.abs(r.nextInt() % size);
+            if (length<100) length += 100;
+            Data d = new Data();
+            d.length = length;
+            d.key = (byte)i;
+            d.data = new byte[length];
+            Arrays.fill(d.data,d.key);
+            if (number>0 && d.data.length>=4) {
+                //populate number
+                d.hasNr = true;
+                XByteBuffer.toBytes(number,d.data, 0);
+            }
+            return d;
+        }
+        
+        public int getNumber() {
+            if (!hasNr) return -1;
+            return XByteBuffer.toInt(this.data, 0);
+        }
+
+        public static boolean verify(Data d) {
+            boolean result = (d.length == d.data.length);
+            for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) 
result = result && d.data[i] == d.key;
+            return result;
+        }
+    }
+
+
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to