[ https://issues.apache.org/jira/browse/CAMEL-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
James Carman updated CAMEL-6771: -------------------------------- Attachment: CAMEL-6771.patch This should fix the problem. I was having trouble creating a unit test that reliably reproduces the issue, so I didn't include it in this patch. Basically, instead of using a HashMap, I'm using a ConcurrentHashMap to store the MulticastProcessor -> AggreationStrategy map. > ConcurrentModificationException thrown from inside camel splitter > ----------------------------------------------------------------- > > Key: CAMEL-6771 > URL: https://issues.apache.org/jira/browse/CAMEL-6771 > Project: Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 2.11.1 > Environment: Amazon Linux, oracle jdk 1.7 > Reporter: Joshua Groom > Attachments: CAMEL-6771.patch > > > We use camel 2.11.1 running on the oracle 1.7 jvm for linux. > I have a route that looks like this. It reads in files and puts them on a > seda queue with 8 concurrent consumers. > - The SpatialInterpolationPojo reads each file is read and split into two > messages X and Y. > - The MyAggregator uses X and Y together and outputs a combined message A.B > - The MySplitterPojo splits A.B into two messages A and B > {code} > from("file://somefile") > .to("seda:filteraccept?concurrentConsumers=8"); > from("seda:filteraccept?concurrentConsumers=8") > .split() > .method(new SpatialInterpolationPojo(), "split") > .to("direct:wind-aggregator"); > from("direct:wind-aggregator") > .aggregate(packageCorrelationId(), new MyAggregator()) > .completionPredicate(header(FIELD_AGGREGATION_COMPLETE).isNotNull()) > .split() > .method(new MySplitterPojo()) > .to("seda:output"); > {code} > The MySplitterPojo simply returns List<Message> containing two messages that > come from data in the input message body. We copy the body headers to the > result messages. > It is thread safe, it has no state, ie there are no object fields that are > modified. > The method is like this it is edited for clarity/privacy: > {code} > public class MySplitterPojo { > public List<Message> splitMessage( > @Headers Map<String, Object> headers, > @Body CombinedObject body) { > > DefaultMessage a = new DefaultMessage(); > a.setBody(body.getA()); > a.setHeaders(new HashMap<String, Object>(headers)); > > DefaultMessage b = new DefaultMessage(); > b.setBody(body.getB()); > b.setHeaders(new HashMap<String, Object>(headers)); > > ArrayList<Message> result = new ArrayList<Message>(2); > result.add(a); > result.add(b); > > return result; > } > } > {code} > When we run this route we very occasionally get the exception below. You can > see that it is entirely within camel, it appears to be trying to copy the map > stored under the exchange property Exchange.AGGREGATION_STRATEGY which is a > camel internal property key. > By inspection of the message I can see that Exchange has just come out of the > WindVectorAggregator. > This seems like it must be a camel bug to me. Any ideas? > {code} > 15 Sep 2013 23:06:47,140[Camel (camel-1) thread #21 - seda://filteraccept] > WARN AggregateProcessor Error processing aggregated exchange. > Exchange[Message: { Trondheim, NO=WindVector [u=-5.92894983291626, > v=7.060009002685547], ... }]. Caused by: > [java.util.ConcurrentModificationException - null] > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextEntry(Unknown Source) > at java.util.HashMap$EntryIterator.next(Unknown Source) > at java.util.HashMap$EntryIterator.next(Unknown Source) > at java.util.HashMap.putAllForCreate(Unknown Source) > at java.util.HashMap.<init>(Unknown Source) > at > org.apache.camel.processor.MulticastProcessor.setAggregationStrategyOnExchange(MulticastProcessor.java:1011) > > at org.apache.camel.processor.Splitter.process(Splitter.java:95) > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) > > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) > > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86) > > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:495) > > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) > at java.util.concurrent.FutureTask.run(Unknown Source) > at > org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62) > > at java.util.concurrent.AbstractExecutorService.submit(Unknown > Source) > at > org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:487) > > at > org.apache.camel.processor.aggregate.AggregateProcessor.onCompletion(AggregateProcessor.java:471) > > at > org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:325) > > at > org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:229) > > at > org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) > > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122) > > at > org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) > > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) > > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571) > > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504) > > at > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:151) > > at > org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:285) > at > org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251) > at > org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161) > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) > > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122) > > at > org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122) > > at > org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) > at > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) > > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) > > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) > > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571) > > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504) > > at > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213) > > at org.apache.camel.processor.Splitter.process(Splitter.java:98) > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) > > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) > > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) > at > org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) > > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) > > at > org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) > > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) > > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) > > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) > > at > org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:294) > > at > org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:203) > at > org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:150) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > at java.lang.Thread.run(Unknown Source) > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira