Author: davsclaus
Date: Tue Dec 28 13:10:59 2010
New Revision: 1053342
URL: http://svn.apache.org/viewvc?rev=1053342&view=rev
Log:
CAMEL-3285: Polished code a bit due code review.
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
Tue Dec 28 13:10:59 2010
@@ -30,7 +30,7 @@ import org.apache.camel.component.routeb
import org.apache.camel.impl.DefaultComponent;
public class RouteboxComponent extends DefaultComponent {
- RouteboxConfiguration config;
+ final RouteboxConfiguration config;
private final Map<String, BlockingQueue<Exchange>> queues = new
HashMap<String, BlockingQueue<Exchange>>();
public RouteboxComponent() {
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
Tue Dec 28 13:10:59 2010
@@ -27,7 +27,6 @@ import org.apache.camel.ProducerTemplate
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultProducerTemplate;
import org.apache.camel.spi.Registry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,7 +56,7 @@ public class RouteboxConfiguration {
public RouteboxConfiguration() {
}
- public RouteboxConfiguration(URI uri) throws Exception {
+ public RouteboxConfiguration(URI uri) {
this();
this.uri = uri;
}
@@ -71,7 +70,9 @@ public class RouteboxConfiguration {
setUri(uri);
setAuthority(uri.getAuthority());
- LOG.info("Authority: " + uri.getAuthority());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Authority: " + uri.getAuthority());
+ }
setEndpointName(getAuthority());
@@ -113,31 +114,28 @@ public class RouteboxConfiguration {
}
if (parameters.containsKey("innerRegistry")) {
- innerRegistry = (Registry)
component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry",
Registry.class);
+ innerRegistry =
component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry",
Registry.class);
}
if (isForkContext()) {
if (innerRegistry != null) {
- innerContext = (CamelContext)
component.resolveAndRemoveReferenceParameter(parameters, "innerContext",
CamelContext.class, new DefaultCamelContext(innerRegistry));
+ innerContext =
component.resolveAndRemoveReferenceParameter(parameters, "innerContext",
CamelContext.class, new DefaultCamelContext(innerRegistry));
} else {
- innerContext = (CamelContext)
component.resolveAndRemoveReferenceParameter(parameters, "innerContext",
CamelContext.class, new DefaultCamelContext());
+ innerContext =
component.resolveAndRemoveReferenceParameter(parameters, "innerContext",
CamelContext.class, new DefaultCamelContext());
}
-
} else {
innerContext = component.getCamelContext();
}
- //configureInnerContext();
- innerProducerTemplate = new DefaultProducerTemplate(innerContext);
- innerProducerTemplate.start();
+ innerProducerTemplate = innerContext.createProducerTemplate();
setQueueSize(component.getAndRemoveParameter(parameters, "size",
Integer.class, 0));
consumerUri = component.resolveAndRemoveReferenceParameter(parameters,
"consumerUri", URI.class, new URI("routebox:" + getEndpointName()));
producerUri = component.resolveAndRemoveReferenceParameter(parameters,
"producerUri", URI.class, new URI("routebox:" + getEndpointName()));
dispatchStrategy =
component.resolveAndRemoveReferenceParameter(parameters, "dispatchStrategy",
RouteboxDispatchStrategy.class, null);
dispatchMap = (HashMap<String, String>)
component.resolveAndRemoveReferenceParameter(parameters, "dispatchMap",
HashMap.class, new HashMap<String, String>());
- if ((dispatchStrategy == null) && (dispatchMap == null)) {
- LOG.warn("No Routebox Dispatch Map or Strategy has been set.
Routebox may not have more than one inner route");
+ if (dispatchStrategy == null && dispatchMap == null) {
+ LOG.warn("No Routebox Dispatch Map or Strategy has been set.
Routebox may not have more than one inner route.");
}
}
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
Tue Dec 28 13:10:59 2010
@@ -21,19 +21,24 @@ import java.util.concurrent.ExecutorServ
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.ExceptionHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public abstract class RouteboxServiceSupport extends ServiceSupport {
- private static final transient Log LOG =
LogFactory.getLog(RouteboxServiceSupport.class);
+ private final transient Log log = LogFactory.getLog(getClass());
+ private ExceptionHandler exceptionHandler;
private RouteboxEndpoint endpoint;
private ExecutorService executor;
- private int pendingExchanges;
- private boolean startedInnerContext;
-
+ private volatile boolean startedInnerContext;
+
public RouteboxServiceSupport(RouteboxEndpoint endpoint) {
this.endpoint = endpoint;
+ if (exceptionHandler == null) {
+ exceptionHandler = new LoggingExceptionHandler(getClass());
+ }
}
protected void doStopInnerContext() throws Exception {
@@ -48,8 +53,8 @@ public abstract class RouteboxServiceSup
List<RouteBuilder> routeBuildersList =
endpoint.getConfig().getRouteBuilders();
if (!(routeBuildersList.isEmpty())) {
for (RouteBuilder routeBuilder : routeBuildersList) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding routebuilder " + routeBuilder + " to " +
context.getName());
+ if (log.isDebugEnabled()) {
+ log.debug("Adding RouteBuilder " + routeBuilder + " to " +
context.getName());
}
context.addRoutes(routeBuilder);
}
@@ -59,14 +64,6 @@ public abstract class RouteboxServiceSup
setStartedInnerContext(true);
}
- public void setPendingExchanges(int pendingExchanges) {
- this.pendingExchanges = pendingExchanges;
- }
-
- public int getPendingExchanges() {
- return pendingExchanges;
- }
-
public RouteboxEndpoint getRouteboxEndpoint() {
return endpoint;
}
@@ -83,14 +80,19 @@ public abstract class RouteboxServiceSup
this.executor = executor;
}
-
public void setStartedInnerContext(boolean startedInnerContext) {
this.startedInnerContext = startedInnerContext;
}
-
public boolean isStartedInnerContext() {
return startedInnerContext;
}
+ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ public ExceptionHandler getExceptionHandler() {
+ return exceptionHandler;
+ }
}
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
Tue Dec 28 13:10:59 2010
@@ -17,24 +17,20 @@
package org.apache.camel.component.routebox.direct;
import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.SuspendableService;
import org.apache.camel.component.routebox.RouteboxConsumer;
+import org.apache.camel.component.routebox.RouteboxEndpoint;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
-import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.ShutdownAware;
public class RouteboxDirectConsumer extends RouteboxServiceSupport implements
RouteboxConsumer, ShutdownAware, SuspendableService {
protected ProducerTemplate producer;
private final Processor processor;
private volatile AsyncProcessor asyncProcessor;
- private ExceptionHandler exceptionHandler;
public RouteboxDirectConsumer(RouteboxDirectEndpoint endpoint, Processor
processor) {
super(endpoint);
@@ -44,33 +40,31 @@ public class RouteboxDirectConsumer exte
protected void doStart() throws Exception {
// add consumer to endpoint
- boolean existing = this ==
((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer();
- if (!existing &&
((RouteboxDirectEndpoint)getRouteboxEndpoint()).hasConsumer(this)) {
- throw new IllegalArgumentException("Cannot add a 2nd consumer to
the same endpoint. Endpoint " + getRouteboxEndpoint() + " only allows one
consumer.");
+ boolean existing = this == getEndpoint().getConsumer();
+ if (!existing && getEndpoint().hasConsumer(this)) {
+ throw new IllegalArgumentException("Cannot add a 2nd consumer to
the same endpoint. Endpoint " + getEndpoint() + " only allows one consumer.");
}
if (!existing) {
- ((RouteboxDirectEndpoint)getRouteboxEndpoint()).addConsumer(this);
+ getEndpoint().addConsumer(this);
}
// now start the inner context
if (!isStartedInnerContext()) {
doStartInnerContext();
}
-
}
protected void doStop() throws Exception {
- ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+ getEndpoint().removeConsumer(this);
// now stop the inner context
if (isStartedInnerContext()) {
doStopInnerContext();
}
-
}
protected void doSuspend() throws Exception {
- ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+ getEndpoint().removeConsumer(this);
}
protected void doResume() throws Exception {
@@ -78,11 +72,6 @@ public class RouteboxDirectConsumer exte
doStart();
}
- public Exchange processRequest(Exchange exchange) {
- return exchange;
-
- }
-
/**
* Provides an {...@link org.apache.camel.AsyncProcessor} interface to the
configured
* processor on the consumer. If the processor does not implement the
interface,
@@ -95,54 +84,24 @@ public class RouteboxDirectConsumer exte
return asyncProcessor;
}
- public ExceptionHandler getExceptionHandler() {
- if (exceptionHandler == null) {
- exceptionHandler = new LoggingExceptionHandler(getClass());
- }
- return exceptionHandler;
- }
-
- public void setExceptionHandler(ExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
-
- /**
- * Handles the given exception using the {...@link #getExceptionHandler()}
- *
- * @param t the exception to handle
- */
- protected void handleException(Throwable t) {
- Throwable newt = (t == null) ? new IllegalArgumentException("Handling
[null] exception") : t;
- getExceptionHandler().handleException(newt);
- }
-
- /* (non-Javadoc)
- * @see
org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
- */
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
// deny stopping on shutdown as we want direct consumers to run in
case some other queues
// depend on this consumer to run, so it can complete its exchanges
return true;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
- */
public int getPendingExchangesSize() {
// return 0 as we do not have an internal memory queue with a variable
size
// of inflight messages.
return 0;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
- */
public void prepareShutdown() {
-
+ // noop
}
- public Endpoint getEndpoint() {
- return (Endpoint) getRouteboxEndpoint();
+ public RouteboxDirectEndpoint getEndpoint() {
+ return (RouteboxDirectEndpoint) getRouteboxEndpoint();
}
public Processor getProcessor() {
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
Tue Dec 28 13:10:59 2010
@@ -27,7 +27,7 @@ import org.apache.camel.component.routeb
import org.apache.camel.component.routebox.RouteboxEndpoint;
public class RouteboxDirectEndpoint extends RouteboxEndpoint {
- private volatile Map<String, RouteboxDirectConsumer> consumers = new
HashMap<String, RouteboxDirectConsumer>();
+ private final Map<String, RouteboxDirectConsumer> consumers = new
HashMap<String, RouteboxDirectConsumer>();
public RouteboxDirectEndpoint(String endpointUri) {
super(endpointUri);
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
Tue Dec 28 13:10:59 2010
@@ -26,9 +26,7 @@ import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,21 +34,20 @@ import org.apache.commons.logging.LogFac
public class RouteboxDirectProducer extends RouteboxServiceSupport implements
Producer, AsyncProcessor {
private static final transient Log LOG =
LogFactory.getLog(RouteboxDirectProducer.class);
protected ProducerTemplate producer;
- private ExceptionHandler exceptionHandler;
-
+
public RouteboxDirectProducer(RouteboxDirectEndpoint endpoint) {
super(endpoint);
producer = endpoint.getConfig().getInnerProducerTemplate();
}
public void process(Exchange exchange) throws Exception {
- Exchange result = null;
+ Exchange result;
if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() ==
null) && (getRouteboxEndpoint().getConfig().isSendToConsumer())) {
throw new CamelExchangeException("No consumers available on
endpoint: " + getRouteboxEndpoint(), exchange);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("**** Dispatching to Inner Route ****");
+ LOG.debug("Dispatching to Inner Route " + exchange);
}
RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
result = dispatcher.dispatchSync(getRouteboxEndpoint(), exchange);
@@ -64,14 +61,14 @@ public class RouteboxDirectProducer exte
boolean flag = true;
if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() ==
null)
- &&
(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()))
{
+ && ((getRouteboxEndpoint()).getConfig().isSendToConsumer())) {
exchange.setException(new CamelExchangeException("No consumers
available on endpoint: " + getRouteboxEndpoint(), exchange));
callback.done(true);
flag = true;
} else {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("**** Dispatching to Inner Route ****");
+ LOG.debug("Dispatching to Inner Route " + exchange);
}
RouteboxDispatcher dispatcher = new
RouteboxDispatcher(producer);
@@ -97,70 +94,46 @@ public class RouteboxDirectProducer exte
}
protected void doStart() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting producer: " + this);
- }
-
- if
(!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())
{
+ if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
// start an inner context
if (!isStartedInnerContext()) {
doStartInnerContext();
}
}
-
}
protected void doStop() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping producer: " + this);
- }
-
- if
(!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())
{
+ if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
// stop the inner context
if (isStartedInnerContext()) {
doStopInnerContext();
}
}
-
}
- @Override
- public String toString() {
- return "Producer[" + getRouteboxEndpoint()
- .getEndpointUri() + "]";
- }
-
public Endpoint getEndpoint() {
return getRouteboxEndpoint();
}
public Exchange createExchange() {
- return getRouteboxEndpoint()
- .createExchange();
+ return getRouteboxEndpoint().createExchange();
}
public Exchange createExchange(ExchangePattern pattern) {
- return getRouteboxEndpoint()
- .createExchange(pattern);
+ return getRouteboxEndpoint().createExchange(pattern);
}
public Exchange createExchange(Exchange exchange) {
- return getRouteboxEndpoint()
- .createExchange(exchange);
+ return getRouteboxEndpoint().createExchange(exchange);
}
public boolean isSingleton() {
return true;
}
- public ExceptionHandler getExceptionHandler() {
- if (exceptionHandler == null) {
- exceptionHandler = new LoggingExceptionHandler(getClass());
- }
- return exceptionHandler;
+ @Override
+ public String toString() {
+ return "Producer[" + getRouteboxEndpoint().getEndpointUri() + "]";
}
- public void setExceptionHandler(ExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
}
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
Tue Dec 28 13:10:59 2010
@@ -41,23 +41,13 @@ public class RouteboxSedaConsumer extend
private static final transient Log LOG =
LogFactory.getLog(RouteboxSedaConsumer.class);
protected AsyncProcessor processor;
protected ProducerTemplate producer;
- private int pendingExchanges;
- private ExceptionHandler exceptionHandler;
-
+
public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor
processor) {
super(endpoint);
this.setProcessor(AsyncProcessorTypeConverter.convert(processor));
- producer = endpoint.getConfig().getInnerProducerTemplate();
- producer.setMaximumCacheSize(endpoint.getConfig().getThreads());
- if (exceptionHandler == null) {
- exceptionHandler = new LoggingExceptionHandler(getClass());
- }
+ this.producer = endpoint.getConfig().getInnerProducerTemplate();
}
-
- /* (non-Javadoc)
- * @see org.apache.camel.impl.ServiceSupport#doStart()
- */
@Override
protected void doStart() throws Exception {
((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStarted(this);
@@ -65,30 +55,24 @@ public class RouteboxSedaConsumer extend
// Create a URI link from the primary context to routes in the new
inner context
int poolSize = getRouteboxEndpoint().getConfig().getThreads();
-
setExecutor(((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy()
- .newFixedThreadPool(this,
((RouteboxSedaEndpoint)getRouteboxEndpoint()).getEndpointUri(), poolSize));
+
setExecutor(getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy()
+ .newFixedThreadPool(this,
getRouteboxEndpoint().getEndpointUri(), poolSize));
for (int i = 0; i < poolSize; i++) {
- getExecutor().execute((Runnable) this);
+ getExecutor().execute(this);
}
}
- /* (non-Javadoc)
- * @see org.apache.camel.impl.ServiceSupport#doStop()
- */
@Override
protected void doStop() throws Exception {
((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStopped(this);
// Shutdown the executor
-
((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
+
getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
setExecutor(null);
doStopInnerContext();
}
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- public void run() {
+ public void run() {
BlockingQueue<Exchange> queue =
((RouteboxSedaEndpoint)getRouteboxEndpoint()).getQueue();
while (queue != null && isRunAllowed()) {
try {
@@ -104,13 +88,13 @@ public class RouteboxSedaConsumer extend
}
private void dispatchToInnerRoute(BlockingQueue<Exchange> queue, final
Exchange exchange) throws InterruptedException {
- Exchange result = null;
+ Exchange result;
if (exchange != null) {
if (isRunAllowed()) {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("**** Dispatching to Inner Route ****");
+ LOG.debug("Dispatching to inner route: " + exchange);
}
RouteboxDispatcher dispatcher = new
RouteboxDispatcher(producer);
result = dispatcher.dispatchAsync(getRouteboxEndpoint(),
exchange);
@@ -131,33 +115,20 @@ public class RouteboxSedaConsumer extend
}
}
-
- /* (non-Javadoc)
- * @see org.apache.camel.Consumer#getEndpoint()
- */
public Endpoint getEndpoint() {
- return (Endpoint) getRouteboxEndpoint();
+ return getRouteboxEndpoint();
}
- /* (non-Javadoc)
- * @see
org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
- */
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
return false;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
- */
public int getPendingExchangesSize() {
- return getPendingExchanges();
+ // TODO: Get size of queue
+ return 0;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
- */
public void prepareShutdown() {
-
}
public void setProcessor(AsyncProcessor processor) {
@@ -168,20 +139,4 @@ public class RouteboxSedaConsumer extend
return processor;
}
- public void setPendingExchanges(int pendingExchanges) {
- this.pendingExchanges = pendingExchanges;
- }
-
- public int getPendingExchanges() {
- return pendingExchanges;
- }
-
- public void setExceptionHandler(ExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
-
- public ExceptionHandler getExceptionHandler() {
- return exceptionHandler;
- }
-
}
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
Tue Dec 28 13:10:59 2010
@@ -111,16 +111,10 @@ public class RouteboxSedaEndpoint extend
return queue;
}
- /* (non-Javadoc)
- * @see
org.apache.camel.MultipleConsumersSupport#isMultipleConsumersSupported()
- */
public boolean isMultipleConsumersSupported() {
return true;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.BrowsableEndpoint#getExchanges()
- */
public List<Exchange> getExchanges() {
return new ArrayList<Exchange>(getQueue());
}
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
Tue Dec 28 13:10:59 2010
@@ -25,13 +25,15 @@ import org.apache.camel.Exchange;
* A strategy for identifying the route consumer in the routebox where the
exchange should to be dispatched
*/
public interface RouteboxDispatchStrategy {
+
/**
* Receives an incoming exchange and consumer list and identifies the
inner route consumer for dispatching the exchange
*
- * @param innerRouteConsumers the list of possible real-time inner route
consumers available
+ * @param destinations the list of possible real-time inner route
consumers available
* to where the exchange can be dispatched in the routebox
* @param exchange the incoming exchange
* @return a selected consumer to whom the exchange can be directed
+ * @throws Exception is thrown if error
*/
URI selectDestinationUri(List<URI> destinations, Exchange exchange) throws
Exception;
}
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
Tue Dec 28 13:10:59 2010
@@ -25,7 +25,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
-import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -40,7 +40,6 @@ import org.apache.camel.model.RouteDefin
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
public class RouteboxDispatcher {
private static final transient Log LOG =
LogFactory.getLog(RouteboxDispatcher.class);
private ProducerTemplate producer;
@@ -51,11 +50,11 @@ public class RouteboxDispatcher {
}
public Exchange dispatchSync(RouteboxEndpoint endpoint, Exchange exchange)
throws Exception {
- URI dispatchUri = null;
- Exchange reply = null;
+ URI dispatchUri;
+ Exchange reply;
if (LOG.isDebugEnabled()) {
- LOG.debug("Dispatching exchange" + exchange + "to endpoint " +
endpoint.getEndpointUri());
+ LOG.debug("Dispatching exchange " + exchange + " to endpoint " +
endpoint.getEndpointUri());
}
dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -63,18 +62,18 @@ public class RouteboxDispatcher {
if (exchange.getPattern() == ExchangePattern.InOnly) {
reply = producer.send(dispatchUri.toASCIIString(), exchange);
} else {
- reply = (Exchange) issueRequest(endpoint, ExchangePattern.InOut,
exchange.getIn().getBody(), exchange.getIn().getHeaders());
+ reply = issueRequest(endpoint, ExchangePattern.InOut,
exchange.getIn().getBody(), exchange.getIn().getHeaders());
}
return reply;
}
public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange
exchange) throws Exception {
- URI dispatchUri = null;
- Exchange reply = null;
+ URI dispatchUri;
+ Exchange reply;
if (LOG.isDebugEnabled()) {
- LOG.debug("Dispatching exchange" + exchange + "to endpoint " +
endpoint.getEndpointUri());
+ LOG.debug("Dispatching exchange " + exchange + " to endpoint " +
endpoint.getEndpointUri());
}
dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -91,26 +90,27 @@ public class RouteboxDispatcher {
}
protected URI selectDispatchUri(RouteboxEndpoint endpoint, Exchange
exchange) throws Exception {
- URI dispatchUri = null;
+ URI dispatchUri;
List<URI> consumerUris =
getInnerContextConsumerList(endpoint.getConfig().getInnerContext());
if (consumerUris.isEmpty()) {
- throw new CamelException("No routes found for dispatch in
Routebox");
+ throw new CamelExchangeException("No routes found to dispatch in
Routebox at " + endpoint, exchange);
} else if (consumerUris.size() == 1) {
dispatchUri = consumerUris.get(0);
} else {
if (!endpoint.getConfig().getDispatchMap().isEmpty()) {
- //apply URI string found in dispatch Map
- if
(endpoint.getConfig().getDispatchMap().containsKey(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")))
{
- dispatchUri = new
URI(endpoint.getConfig().getDispatchMap().get(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")));
+ // apply URI string found in dispatch Map
+ String key = exchange.getIn().getHeader("ROUTE_DISPATCH_KEY",
String.class);
+ if (endpoint.getConfig().getDispatchMap().containsKey(key)) {
+ dispatchUri = new
URI(endpoint.getConfig().getDispatchMap().get(key));
} else {
- throw new CamelException("No matching entry found in
Dispatch Map for ROUTE_DISPATCH_KEY: " +
exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"));
+ throw new CamelExchangeException("No matching entry found
in Dispatch Map for ROUTE_DISPATCH_KEY: " + key, exchange);
}
} else {
- //apply dispatch strategy
+ // apply dispatch strategy
dispatchUri =
endpoint.getConfig().getDispatchStrategy().selectDestinationUri(consumerUris,
exchange);
if (dispatchUri == null) {
- throw new CamelException("No matching inner routes found
for Operation");
+ throw new CamelExchangeException("No matching inner routes
found for Operation", exchange);
}
}
}
@@ -138,9 +138,7 @@ public class RouteboxDispatcher {
Exchange exchange = producer.send(endpoint, pattern, new Processor() {
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
- for (Map.Entry<String, Object> header : headers.entrySet()) {
- in.setHeader(header.getKey(), header.getValue());
- }
+ in.getHeaders().putAll(headers);
in.setBody(body);
}
});