Author: fhanik
Date: Tue Jul 18 14:04:00 2006
New Revision: 423244
URL: http://svn.apache.org/viewvc?rev=423244&view=rev
Log:
Message dispatch interceptor uses a thread pool
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=423244&r1=423243&r2=423244&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
Tue Jul 18 14:04:00 2006
@@ -15,17 +15,20 @@
package org.apache.catalina.tribes.group.interceptors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+import java.util.concurrent.TimeUnit;
/**
*
* Same implementation as the MessageDispatchInterceptor
* except is ues an atomic long for the currentSize calculation
+ * and uses a thread pool for message sending.
*
* @author Filip Hanik
* @version 1.0
@@ -34,7 +37,11 @@
public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
protected AtomicLong currentSize = new AtomicLong(0);
- protected LinkedBlockingQueue queue = new LinkedBlockingQueue();
+ protected ThreadPoolExecutor executor = null;
+ protected int maxThreads = 10;
+ protected int maxSpareThreads = 2;
+ protected long keepAliveTime = 5000;
+ protected LinkedBlockingQueue<Runnable> runnablequeue = new
LinkedBlockingQueue<Runnable>();
public long getCurrentSize() {
return currentSize.get();
@@ -50,32 +57,55 @@
}
public boolean addToQueue(ChannelMessage msg, Member[] destination,
InterceptorPayload payload) {
- LinkObject obj = new LinkObject(msg,destination,payload);
- return queue.offer(obj);
+ final LinkObject obj = new LinkObject(msg,destination,payload);
+ Runnable r = new Runnable() {
+ public void run() {
+ sendAsyncData(obj);
+ }
+ };
+ executor.execute(r);
+ return true;
}
public LinkObject removeFromQueue() {
- LinkObject head = null;
- try {
- head = (LinkObject)queue.take();
- }catch ( InterruptedException x ) {}
- return head;
+ return null; //not used, thread pool contains its own queue.
}
public void startQueue() {
- msgDispatchThread = new Thread(this);
-
msgDispatchThread.setName("MessageDispatch15Interceptor.MessageDispatchThread");
- msgDispatchThread.setDaemon(true);
- msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+ if ( run ) return;
+ executor = new
ThreadPoolExecutor(maxSpareThreads,maxThreads,keepAliveTime,TimeUnit.MILLISECONDS,runnablequeue);
run = true;
- msgDispatchThread.start();
}
public void stopQueue() {
run = false;
- msgDispatchThread.interrupt();
+ executor.shutdownNow();
setAndGetCurrentSize(0);
+ runnablequeue.clear();
}
+ public long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public int getMaxSpareThreads() {
+ return maxSpareThreads;
+ }
+
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ public void setKeepAliveTime(long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public void setMaxSpareThreads(int maxSpareThreads) {
+ this.maxSpareThreads = maxSpareThreads;
+ }
+
+ public void setMaxThreads(int maxThreads) {
+ this.maxThreads = maxThreads;
+ }
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=423244&r1=423243&r2=423244&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
Tue Jul 18 14:04:00 2006
@@ -165,32 +165,37 @@
LinkObject link = removeFromQueue();
if ( link == null ) continue; //should not happen unless we exceed
wait time
while ( link != null && run ) {
- ChannelMessage msg = link.data();
- Member[] destination = link.getDestination();
- try {
- super.sendMessage(destination,msg,null);
- try {
- if ( link.getHandler() != null )
link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId()));
- } catch ( Exception ex ) {
- log.error("Unable to report back completed
message.",ex);
- }
- } catch ( Exception x ) {
- ChannelException cx = null;
- if ( x instanceof ChannelException ) cx =
(ChannelException)x;
- else cx = new ChannelException(x);
- if ( log.isDebugEnabled() ) log.debug("Error while
processing async message.",x);
- try {
- if (link.getHandler() != null)
link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId()));
- } catch ( Exception ex ) {
- log.error("Unable to report back error message.",ex);
- }
- } finally {
- addAndGetCurrentSize(-msg.getMessage().getLength());
- link = link.next();
- }//try
+ link = sendAsyncData(link);
}//while
}//while
}//run
+
+ protected LinkObject sendAsyncData(LinkObject link) {
+ ChannelMessage msg = link.data();
+ Member[] destination = link.getDestination();
+ try {
+ super.sendMessage(destination,msg,null);
+ try {
+ if ( link.getHandler() != null )
link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId()));
+ } catch ( Exception ex ) {
+ log.error("Unable to report back completed message.",ex);
+ }
+ } catch ( Exception x ) {
+ ChannelException cx = null;
+ if ( x instanceof ChannelException ) cx = (ChannelException)x;
+ else cx = new ChannelException(x);
+ if ( log.isDebugEnabled() ) log.debug("Error while processing
async message.",x);
+ try {
+ if (link.getHandler() != null)
link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId()));
+ } catch ( Exception ex ) {
+ log.error("Unable to report back error message.",ex);
+ }
+ } finally {
+ addAndGetCurrentSize(-msg.getMessage().getLength());
+ link = link.next();
+ }//try
+ return link;
+ }
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=423244&r1=423243&r2=423244&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
Tue Jul 18 14:04:00 2006
@@ -9,6 +9,7 @@
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
import
org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
/**
* <p>Title: </p>
@@ -24,7 +25,7 @@
*/
public class TestDataIntegrity extends TestCase {
int msgCount = 1000;
- int threadCount = 8;
+ int threadCount = 20;
GroupChannel channel1;
GroupChannel channel2;
Listener listener1;
@@ -32,9 +33,9 @@
protected void setUp() throws Exception {
super.setUp();
channel1 = new GroupChannel();
- channel1.addInterceptor(new MessageDispatchInterceptor());
+ channel1.addInterceptor(new MessageDispatch15Interceptor());
channel2 = new GroupChannel();
- channel2.addInterceptor(new MessageDispatchInterceptor());
+ channel2.addInterceptor(new MessageDispatch15Interceptor());
listener1 = new Listener();
channel2.addChannelListener(listener1);
channel1.start(GroupChannel.DEFAULT);
@@ -54,7 +55,7 @@
threads[x] = new Thread() {
public void run() {
try {
- for (int i = 0; i < msgCount; i++)
channel1.send(channel1.getMembers(), Data.createRandomData(),0);
+ for (int i = 0; i < msgCount; i++) channel1.send(new
Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0);
}catch ( Exception x ) {
x.printStackTrace();
return;
@@ -68,11 +69,46 @@
for (int x=0; x<threads.length; x++ ) { threads[x].join();}
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
- while ( (System.currentTimeMillis()-start)<50000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+ while ( (System.currentTimeMillis()-start)<15000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
System.err.println("Finished NO_ACK");
assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
}
+ public void testDataSendASYNCM() throws Exception {
+ System.err.println("Starting ASYNC MULTI THREAD");
+ Thread[] threads = new Thread[threadCount];
+ for (int x=0; x<threads.length; x++ ) {
+ threads[x] = new Thread() {
+ public void run() {
+ try {
+ for (int i = 0; i < msgCount; i++)
channel1.send(new Member[] {channel2.getLocalMember(false)},
Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ return;
+ } finally {
+ threadCounter++;
+ }
+ }
+ };
+ }
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<15000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+ System.err.println("Finished ASYNC MULTI THREAD");
+ assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
+ }
+ public void testDataSendASYNC() throws Exception {
+ System.err.println("Starting ASYNC");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<5000 &&
msgCount!=listener1.count) Thread.sleep(500);
+ System.err.println("Finished ASYNC");
+ assertEquals("Checking success messages.",msgCount,listener1.count);
+ }
+
public void testDataSendACK() throws Exception {
System.err.println("Starting ACK");
for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK);
@@ -83,7 +119,7 @@
public void testDataSendSYNCACK() throws Exception {
System.err.println("Starting SYNC_ACK");
- for (int i=0; i<msgCount; i++)
channel1.send(channel1.getMembers(),Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
Thread.sleep(250);
System.err.println("Finished SYNC_ACK");
assertEquals("Checking success messages.",msgCount,listener1.count);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]