Author: fhanik Date: Fri Jan 9 15:21:08 2009 New Revision: 733187 URL: http://svn.apache.org/viewvc?rev=733187&view=rev Log: defer the deserialization of the message to an async thread to be able to handle more incoming, still I can send more than I can receive
Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java?rev=733187&r1=733186&r2=733187&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java Fri Jan 9 15:21:08 2009 @@ -503,7 +503,6 @@ public boolean accept(ChannelMessage msg) { return true; } - public void broadcast(ChannelMessage message) throws ChannelException { if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) throw new ChannelException("Multicast send is not started or enabled."); Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=733187&r1=733186&r2=733187&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Fri Jan 9 15:21:08 2009 @@ -346,19 +346,7 @@ if (XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) { memberDataReceived(data); } else { - XByteBuffer buffer = new XByteBuffer(data,true); - if (buffer.countPackages(true)>0) { - int count = buffer.countPackages(); - ChannelData[] pkgs = new ChannelData[count]; - for (int i=0; i<count; i++) { - try { - pkgs[i] = buffer.extractPackage(true); - }catch (IllegalStateException ise) { - log.debug("Unable to decode message.",ise); - } - } - memberBroadcastsReceived(pkgs); - } + memberBroadcastsReceived(data); } } @@ -407,28 +395,42 @@ } } - private void memberBroadcastsReceived(final ChannelData[] data) { + private void memberBroadcastsReceived(final byte[] b) { if (log.isTraceEnabled()) log.trace("Mcast received broadcasts."); - Runnable t = new Runnable() { - public void run() { - String name = Thread.currentThread().getName(); + XByteBuffer buffer = new XByteBuffer(b,true); + if (buffer.countPackages(true)>0) { + int count = buffer.countPackages(); + final ChannelData[] data = new ChannelData[count]; + for (int i=0; i<count; i++) { try { - Thread.currentThread().setName("Membership-MemberAdded."); - for (int i=0; i<data.length; i++ ) { - try { - if (data[i]!=null) { - msgservice.messageReceived(data[i]); + data[i] = buffer.extractPackage(true); + }catch (IllegalStateException ise) { + log.debug("Unable to decode message.",ise); + }catch (IOException x) { + log.debug("Unable to decode message.",x); + } + } + Runnable t = new Runnable() { + public void run() { + String name = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("Membership-MemberAdded."); + for (int i=0; i<data.length; i++ ) { + try { + if (data[i]!=null && !member.equals(data[i].getAddress())) { + msgservice.messageReceived(data[i]); + } + }catch (Throwable t) { + log.error("Unable to receive broadcast message.",t); } - }catch (Throwable t) { - log.error("Unable to receive broadcast message.",t); } + }finally { + Thread.currentThread().setName(name); } - }finally { - Thread.currentThread().setName(name); } - } - }; - executor.execute(t); + }; + executor.execute(t); + } } protected final Object expiredMutex = new Object(); @@ -469,6 +471,7 @@ } private final Object sendLock = new Object(); + public void send(boolean checkexpired, DatagramPacket packet) throws IOException{ checkexpired = (checkexpired && (packet==null)); //ignore if we haven't started the sender --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org