Author: ningjiang
Date: Tue Dec 2 23:27:52 2008
New Revision: 722800
URL: http://svn.apache.org/viewvc?rev=722800&view=rev
Log:
CAMEL-1098 Applied patch with thanks to Tim
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
Tue Dec 2 23:27:52 2008
@@ -17,7 +17,7 @@
package org.apache.camel.model;
import java.util.List;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -49,7 +49,7 @@
@XmlTransient
private AggregationStrategy aggregationStrategy;
@XmlTransient
- private ThreadPoolExecutor threadPoolExecutor;
+ private Executor executor;
@Override
public String toString() {
@@ -74,9 +74,9 @@
aggregationStrategy = new UseLatestAggregationStrategy();
}
if (threadPoolRef != null) {
- threadPoolExecutor = routeContext.lookup(threadPoolRef,
ThreadPoolExecutor.class);
+ executor = routeContext.lookup(threadPoolRef, Executor.class);
}
- return new MulticastProcessor(list, aggregationStrategy,
isParallelProcessing(), threadPoolExecutor);
+ return new MulticastProcessor(list, aggregationStrategy,
isParallelProcessing(), executor);
}
public AggregationStrategy getAggregationStrategy() {
@@ -97,12 +97,12 @@
return this;
}
- public ThreadPoolExecutor getThreadPoolExecutor() {
- return threadPoolExecutor;
+ public Executor getExecutor() {
+ return executor;
}
- public MulticastType setThreadPoolExecutor(ThreadPoolExecutor executor) {
- this.threadPoolExecutor = executor;
+ public MulticastType setThreadPoolExecutor(Executor executor) {
+ this.executor = executor;
return this;
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Tue Dec 2 23:27:52 2008
@@ -24,7 +24,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
+
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -298,7 +299,7 @@
* @return a ThreadType builder that can be used to further configure the
* the thread pool.
*/
- public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
+ public ProcessorType<Type> thread(Executor executor) {
ThreadType answer = new ThreadType(executor);
addOutput(answer);
return this;
@@ -593,15 +594,15 @@
*
* @param expression the expression on which to split
* @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
- * @param threadPoolExecutor override the default [EMAIL PROTECTED]
ThreadPoolExecutor}
+ * @param executor override the default [EMAIL PROTECTED] Executor}
* @return the builder
*/
public SplitterType split(Expression expression, boolean
parallelProcessing,
- ThreadPoolExecutor threadPoolExecutor) {
+ Executor executor) {
SplitterType answer = new SplitterType(expression);
addOutput(answer);
answer.setParallelProcessing(parallelProcessing);
- answer.setThreadPoolExecutor(threadPoolExecutor);
+ answer.setExecutor(executor);
return answer;
}
@@ -629,14 +630,14 @@
* The splitter responds with the answer produced by the given [EMAIL
PROTECTED] AggregationStrategy}.
*
* @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
- * @param threadPoolExecutor override the default [EMAIL PROTECTED]
ThreadPoolExecutor}
+ * @param executor override the default [EMAIL PROTECTED] Executor}
* @return the expression clause for the expression on which to split
*/
- public ExpressionClause<SplitterType> split(boolean parallelProcessing,
ThreadPoolExecutor threadPoolExecutor) {
+ public ExpressionClause<SplitterType> split(boolean parallelProcessing,
Executor executor) {
SplitterType answer = new SplitterType();
addOutput(answer);
answer.setParallelProcessing(parallelProcessing);
- answer.setThreadPoolExecutor(threadPoolExecutor);
+ answer.setExecutor(executor);
return ExpressionClause.createAndSetExpression(answer);
}
@@ -669,16 +670,16 @@
* @param expression the expression on which to split
* @param aggregationStrategy the strategy used to aggregate responses for
every part
* @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
- * @param threadPoolExecutor override the default [EMAIL PROTECTED]
ThreadPoolExecutor}
+ * @param executor override the default [EMAIL PROTECTED] Executor}
* @return the builder
*/
public SplitterType split(Expression expression, AggregationStrategy
aggregationStrategy,
- boolean parallelProcessing,
ThreadPoolExecutor threadPoolExecutor) {
+ boolean parallelProcessing, Executor
executor) {
SplitterType answer = new SplitterType(expression);
addOutput(answer);
answer.setAggregationStrategy(aggregationStrategy);
answer.setParallelProcessing(parallelProcessing);
- answer.setThreadPoolExecutor(threadPoolExecutor);
+ answer.setExecutor(executor);
return answer;
}
@@ -708,16 +709,16 @@
*
* @param aggregationStrategy the strategy used to aggregate responses for
every part
* @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
- * @param threadPoolExecutor override the default [EMAIL PROTECTED]
ThreadPoolExecutor}
+ * @param executor override the default [EMAIL PROTECTED] Executor}
* @return the expression clause for the expression on which to split
*/
public ExpressionClause<SplitterType> split(AggregationStrategy
aggregationStrategy, boolean parallelProcessing,
- ThreadPoolExecutor
threadPoolExecutor) {
+ Executor executor) {
SplitterType answer = new SplitterType();
addOutput(answer);
answer.setAggregationStrategy(aggregationStrategy);
answer.setParallelProcessing(parallelProcessing);
- answer.setThreadPoolExecutor(threadPoolExecutor);
+ answer.setExecutor(executor);
return ExpressionClause.createAndSetExpression(answer);
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Tue Dec 2 23:27:52 2008
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -47,7 +48,7 @@
@XmlAttribute(required = false)
private Boolean parallelProcessing;
@XmlTransient
- private ThreadPoolExecutor threadPoolExecutor;
+ private Executor executor;
@XmlAttribute(required = false)
private String threadPoolExecutorRef;
@XmlAttribute(required = false)
@@ -80,9 +81,9 @@
if (aggregationStrategy == null) {
aggregationStrategy = new UseLatestAggregationStrategy();
}
- threadPoolExecutor = createThreadPoolExecutor(routeContext);
+ executor = createThreadPoolExecutor(routeContext);
return new Splitter(getExpression().createExpression(routeContext),
childProcessor, aggregationStrategy,
- isParallelProcessing(), threadPoolExecutor, streaming);
+ isParallelProcessing(), executor, streaming);
}
public AggregationStrategy getAggregationStrategy() {
@@ -127,23 +128,23 @@
return this;
}
- private ThreadPoolExecutor createThreadPoolExecutor(RouteContext
routeContext) {
- ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
- if (threadPoolExecutor == null && threadPoolExecutorRef != null) {
- threadPoolExecutor = routeContext.lookup(threadPoolExecutorRef,
ThreadPoolExecutor.class);
+ private Executor createThreadPoolExecutor(RouteContext routeContext) {
+ Executor executor = getExecutor();
+ if (executor == null && threadPoolExecutorRef != null) {
+ executor = routeContext.lookup(threadPoolExecutorRef,
ThreadPoolExecutor.class);
}
- if (threadPoolExecutor == null) {
+ if (executor == null) {
// fall back and use default
- threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
+ executor = new ThreadPoolExecutor(4, 16, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
- return threadPoolExecutor;
+ return executor;
}
- public ThreadPoolExecutor getThreadPoolExecutor() {
- return threadPoolExecutor;
+ public Executor getExecutor() {
+ return executor;
}
- public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
- this.threadPoolExecutor = threadPoolExecutor;
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
Tue Dec 2 23:27:52 2008
@@ -19,7 +19,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -63,7 +63,7 @@
@XmlTransient
private ThreadGroup threadGroup;
@XmlTransient
- private ThreadPoolExecutor executor;
+ private Executor executor;
public ThreadType() {
}
@@ -73,7 +73,7 @@
this.maxSize = coreSize;
}
- public ThreadType(ThreadPoolExecutor executor) {
+ public ThreadType(Executor executor) {
this.executor = executor;
}
@@ -245,7 +245,7 @@
* @param executor the executor
* @return the builder
*/
- public ThreadType executor(ThreadPoolExecutor executor) {
+ public ThreadType executor(Executor executor) {
setExecutor(executor);
return this;
}
@@ -292,11 +292,11 @@
this.threadGroup = threadGroup;
}
- public ThreadPoolExecutor getExecutor() {
+ public Executor getExecutor() {
return executor;
}
- public void setExecutor(ThreadPoolExecutor executor) {
+ public void setExecutor(Executor executor) {
this.executor = executor;
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Tue Dec 2 23:27:52 2008
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@@ -69,7 +70,7 @@
private Collection<Processor> processors;
private AggregationStrategy aggregationStrategy;
private boolean isParallelProcessing;
- private ThreadPoolExecutor executor;
+ private Executor executor;
private final boolean streaming;
private final AtomicBoolean shutdown = new AtomicBoolean(true);
@@ -81,11 +82,11 @@
this(processors, aggregationStrategy, false, null);
}
- public MulticastProcessor(Collection<Processor> processors,
AggregationStrategy aggregationStrategy, boolean parallelProcessing,
ThreadPoolExecutor executor) {
+ public MulticastProcessor(Collection<Processor> processors,
AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor
executor) {
this(processors, aggregationStrategy, parallelProcessing, executor,
false);
}
- public MulticastProcessor(Collection<Processor> processors,
AggregationStrategy aggregationStrategy, boolean parallelProcessing,
ThreadPoolExecutor executor, boolean streaming) {
+ public MulticastProcessor(Collection<Processor> processors,
AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor
executor, boolean streaming) {
notNull(processors, "processors");
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
@@ -228,17 +229,17 @@
protected void doStop() throws Exception {
shutdown.set(true);
- if (executor != null) {
- executor.shutdown();
- executor.awaitTermination(0, TimeUnit.SECONDS);
+ if (executor != null && executor instanceof ThreadPoolExecutor) {
+ ((ThreadPoolExecutor)executor).shutdown();
+ ((ThreadPoolExecutor)executor).awaitTermination(0,
TimeUnit.SECONDS);
}
ServiceHelper.stopServices(processors);
}
protected void doStart() throws Exception {
shutdown.set(false);
- if (executor != null) {
- executor.setRejectedExecutionHandler(new
RejectedExecutionHandler() {
+ if (executor != null && executor instanceof ThreadPoolExecutor) {
+ ((ThreadPoolExecutor)executor).setRejectedExecutionHandler(new
RejectedExecutionHandler() {
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor executor) {
ProcessCall call = (ProcessCall)runnable;
call.exchange.setException(new
RejectedExecutionException());
@@ -274,7 +275,7 @@
return aggregationStrategy;
}
- public ThreadPoolExecutor getExecutor() {
+ public Executor getExecutor() {
return executor;
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
Tue Dec 2 23:27:52 2008
@@ -22,7 +22,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -53,8 +53,8 @@
}
public Splitter(Expression expression, Processor destination,
AggregationStrategy aggregationStrategy,
- boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor,
boolean streaming) {
- super(Collections.singleton(destination), aggregationStrategy,
parallelProcessing, threadPoolExecutor, streaming);
+ boolean parallelProcessing, Executor executor, boolean streaming) {
+ super(Collections.singleton(destination), aggregationStrategy,
parallelProcessing, executor, streaming);
this.expression = expression;
notNull(expression, "expression");
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
Tue Dec 2 23:27:52 2008
@@ -18,10 +18,11 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.AsyncCallback;
@@ -37,7 +38,7 @@
*/
public class ThreadProcessor implements AsyncProcessor, Service {
- private ThreadPoolExecutor executor;
+ private Executor executor;
private long stackSize;
private ThreadGroup threadGroup;
private int priority = Thread.NORM_PRIORITY;
@@ -100,8 +101,10 @@
public void stop() throws Exception {
shutdown.set(true);
- executor.shutdown();
- executor.awaitTermination(0, TimeUnit.SECONDS);
+ if (executor instanceof ThreadPoolExecutor) {
+ ((ThreadPoolExecutor)executor).shutdown();
+ ((ThreadPoolExecutor)executor).awaitTermination(0,
TimeUnit.SECONDS);
+ }
}
public long getStackSize() {
@@ -179,7 +182,7 @@
this.taskQueue = taskQueue;
}
- public ThreadPoolExecutor getExecutor() {
+ public Executor getExecutor() {
if (executor == null) {
executor = new ThreadPoolExecutor(getCoreSize(), getMaxSize(),
getKeepAliveTime(), TimeUnit.MILLISECONDS, getTaskQueue(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
@@ -198,7 +201,7 @@
return executor;
}
- public void setExecutor(ThreadPoolExecutor executor) {
+ public void setExecutor(Executor executor) {
this.executor = executor;
}
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
Tue Dec 2 23:27:52 2008
@@ -88,7 +88,7 @@
+ " being less than: " +
failUntilAttempt);
}
}
-
+ // START SNIPPET: AsyncProcessor
public boolean process(Exchange exchange, AsyncCallback callback)
{
Integer counter =
exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
Integer.class);
@@ -105,6 +105,7 @@
callback.done(false);
return false;
}
+ // END SNIPPET: AsyncProcessor
};
return new RouteBuilder() {
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
Tue Dec 2 23:27:52 2008
@@ -32,7 +32,7 @@
protected ThreadPoolExecutor customThreadPoolExecutor = new
ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
public void testSplitterWithCustomThreadPoolExecutor() throws Exception {
- ThreadPoolExecutor threadPoolExecutor =
getSplitter().getThreadPoolExecutor();
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
getSplitter().getExecutor();
// this should be sufficient as core pool size is the only thing I
changed from the default
assertTrue(threadPoolExecutor.getCorePoolSize() ==
customThreadPoolExecutor.getCorePoolSize());
assertTrue(threadPoolExecutor.getMaximumPoolSize() ==
customThreadPoolExecutor.getMaximumPoolSize());