Author: fhanik
Date: Fri Jan  9 15:21:08 2009
New Revision: 733187

URL: http://svn.apache.org/viewvc?rev=733187&view=rev
Log:
defer the deserialization of the message to an async thread to be able to 
handle more incoming, still I can send more than I can receive


Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java?rev=733187&r1=733186&r2=733187&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java 
Fri Jan  9 15:21:08 2009
@@ -503,7 +503,6 @@
     public boolean accept(ChannelMessage msg) {
         return true;
     }
-    
     public void broadcast(ChannelMessage message) throws ChannelException {
         if (impl==null || (impl.startLevel & 
Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ )
             throw new ChannelException("Multicast send is not started or 
enabled.");

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=733187&r1=733186&r2=733187&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
(original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
Fri Jan  9 15:21:08 2009
@@ -346,19 +346,7 @@
                 if 
(XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) {
                     memberDataReceived(data);
                 } else {
-                    XByteBuffer buffer = new XByteBuffer(data,true);
-                    if (buffer.countPackages(true)>0) {
-                        int count = buffer.countPackages();
-                        ChannelData[] pkgs = new ChannelData[count];
-                        for (int i=0; i<count; i++) {
-                            try {
-                                pkgs[i] = buffer.extractPackage(true);
-                            }catch (IllegalStateException ise) {
-                                log.debug("Unable to decode message.",ise);
-                            }
-                        }
-                        memberBroadcastsReceived(pkgs);
-                    }
+                    memberBroadcastsReceived(data);
                 }
                 
             }
@@ -407,28 +395,42 @@
         }
     }
     
-    private void memberBroadcastsReceived(final ChannelData[] data) {
+    private void memberBroadcastsReceived(final byte[] b) {
         if (log.isTraceEnabled()) log.trace("Mcast received broadcasts.");
-        Runnable t = new Runnable() {
-            public void run() {
-                String name = Thread.currentThread().getName();
+        XByteBuffer buffer = new XByteBuffer(b,true);
+        if (buffer.countPackages(true)>0) {
+            int count = buffer.countPackages();
+            final ChannelData[] data = new ChannelData[count];
+            for (int i=0; i<count; i++) {
                 try {
-                    Thread.currentThread().setName("Membership-MemberAdded.");
-                    for (int i=0; i<data.length; i++ ) {
-                        try {
-                            if (data[i]!=null) {
-                                msgservice.messageReceived(data[i]);
+                    data[i] = buffer.extractPackage(true);
+                }catch (IllegalStateException ise) {
+                    log.debug("Unable to decode message.",ise);
+                }catch (IOException x) {
+                    log.debug("Unable to decode message.",x);
+                }
+            }
+            Runnable t = new Runnable() {
+                public void run() {
+                    String name = Thread.currentThread().getName();
+                    try {
+                        
Thread.currentThread().setName("Membership-MemberAdded.");
+                        for (int i=0; i<data.length; i++ ) {
+                            try {
+                                if (data[i]!=null && 
!member.equals(data[i].getAddress())) {
+                                    msgservice.messageReceived(data[i]);
+                                }
+                            }catch (Throwable t) {
+                                log.error("Unable to receive broadcast 
message.",t);
                             }
-                        }catch (Throwable t) {
-                            log.error("Unable to receive broadcast 
message.",t);
                         }
+                    }finally {
+                        Thread.currentThread().setName(name);
                     }
-                }finally {
-                    Thread.currentThread().setName(name);
                 }
-            }
-        };
-        executor.execute(t);
+            };
+            executor.execute(t);
+        }
     }
 
     protected final Object expiredMutex = new Object();
@@ -469,6 +471,7 @@
     }
     
     private final Object sendLock = new Object();
+
     public void send(boolean checkexpired, DatagramPacket packet) throws 
IOException{
         checkexpired = (checkexpired && (packet==null));
         //ignore if we haven't started the sender



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to