davsclaus commented on code in PR #11906: URL: https://github.com/apache/camel/pull/11906#discussion_r1384698132
########## components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterEndpoint.java: ########## @@ -140,30 +152,90 @@ protected void doInit() throws Exception { super.doInit(); DynamicRouterComponent component = getDynamicRouterComponent(); if (CONTROL_CHANNEL_NAME.equals(configuration.getChannel())) { - final DynamicRouterControlChannelProcessor processor = controlChannelProcessorFactorySupplier.get() - .getInstance(component); - processor.setConfiguration(configuration); try { // There can be multiple control actions, but we do not want to // create another consumer on the control channel, so check to // see if the consumer has already been created, and skip the // creation of another consumer if one already exists if (component.getControlChannelProcessor() == null) { + DynamicRouterControlChannelProcessor processor = controlChannelProcessorFactorySupplier.get() + .getInstance(component); + processor.setConfiguration(configuration); component.setControlChannelProcessor(processor); } } catch (Exception e) { throw new IllegalStateException("Could not create Dynamic Router endpoint", e); } } else { - final DynamicRouterProcessor processor = processorFactorySupplier.get() - .getInstance("dynamicRouterProcessor-" + configuration.getChannel(), getCamelContext(), - configuration.getRecipientMode(), configuration.isWarnDroppedMessage(), - filterProcessorFactorySupplier); - ServiceHelper.startService(processor); + CamelContext camelContext = getCamelContext(); + String routeId = configuration.getRouteId(); + long timeout = configuration.getTimeout(); + ErrorHandler errorHandler = new NoErrorHandler(null); + if (producerCache == null) { + producerCache = new DefaultProducerCache(this, camelContext, 1000); + } + ExecutorService aggregateExecutorService = camelContext.getExecutorServiceManager() Review Comment: This looks a bit wrong where you create 2x thread pool if timeout > 0 ########## components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfiguration.java: ########## @@ -131,6 +129,103 @@ public class DynamicRouterConfiguration { @UriParam(label = "common", defaultValue = "false") private boolean warnDroppedMessage; + /** + * The ID of the route. + */ + @UriParam(label = "common") + private String routeId; + + /** + * If enabled, then sending via multicast occurs concurrently. Note that the caller thread will still wait until all + * messages have been fully processed before it continues. It is only the sending and processing of the replies from + * the multicast recipients that happens concurrently. When parallel processing is enabled, then the Camel routing + * engine will continue processing using the last used thread from the parallel thread pool. However, if you want to + * use the original thread that called the multicast, then make sure to enable the synchronous option as well. + */ + @UriParam(label = "common", defaultValue = "false") + private boolean parallelProcessing; + + /** + * If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would + * require the implementation of AggregationStrategy to be implemented as thread-safe. By default, this is false, + * meaning that Camel synchronizes the call to the aggregate method. Though, in some use-cases, this can be used to + * archive higher performance when the AggregationStrategy is implemented as thread-safe. + */ + @UriParam(label = "common", defaultValue = "false") + private boolean parallelAggregate; + + /** + * Will stop further processing if an exception or failure occurred during processing of an + * {@link org.apache.camel.Exchange} and the caused exception will be thrown. Will also stop if processing the + * exchange failed (has a fault message), or an exception was thrown and handled by the error handler (such as using + * onException). In all situations, the multicast will stop further processing. This is the same behavior as in the + * pipeline that is used by the routing engine. The default behavior is to not stop, but to continue processing + * until the end. + */ + @UriParam(label = "common", defaultValue = "false") + private boolean stopOnException; + + /** + * Ignore the invalid endpoint exception when attempting to create a producer with an invalid endpoint. + */ + @UriParam(label = "common", defaultValue = "false") + private boolean ignoreInvalidEndpoints; + + /** + * If enabled, then Camel will process replies out-of-order (e.g., in the order they come back). If disabled, Camel + * will process replies in the same order as defined by the multicast. + */ + @UriParam(label = "common", defaultValue = "false") + private boolean streaming; + + /** + * Sets a total timeout specified in milliseconds, when using parallel processing. If the Multicast has not been + * able to send and process all replies within the given timeframe, then the timeout triggers and the Multicast + * breaks out and continues. Notice that, if you provide a TimeoutAwareAggregationStrategy, then the timeout method + * is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks (for + * which it is difficult for Camel to shut down in a graceful manner) may continue to run. So use this option with a + * bit of care. + */ + @UriParam(label = "common", defaultValue = "-1") + private long timeout; + + /** + * Uses the Processor when preparing the {@link org.apache.camel.Exchange} to be sent. This can be used to + * deep-clone messages that should be sent, or to provide any custom logic that is needed before the exchange is + * sent. + */ + @UriParam(label = "common") + private String onPrepare; + + /** + * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. Multicast will, + * by default, not share a unit of work between the parent exchange and each multicasted exchange. This means each + * sub exchange has its own individual unit of work. + */ + @UriParam(label = "common", defaultValue = "false") + private boolean shareUnitOfWork; + + /** + * Refers to a custom Thread Pool to be used for parallel processing. Notice that, if you set this option, then + * parallel processing is automatically implied, and you do not have to enable that option in addition to this one. + */ + @UriParam(label = "common") + private String executorService; + + /** + * TODO: not a configuration setting Review Comment: TODO here ########## components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/DynamicRouterConfiguration.java: ########## @@ -131,6 +129,103 @@ public class DynamicRouterConfiguration { @UriParam(label = "common", defaultValue = "false") private boolean warnDroppedMessage; + /** + * The ID of the route. + */ + @UriParam(label = "common") + private String routeId; Review Comment: Why do you need this option and if so, then can you make a better description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org