Thanks for the reproducer, this makes it much easier to understand. And thanks Claus for pointing out the async nature of the resequencer.
I am going to add a note on the documentation about the limitations of using manual commit along with operations that may run on separate threads. Kind regards On Wed, Nov 23, 2022 at 10:35 PM Ivan <ivan.rododen...@gmail.com> wrote: > Ok thanks. > > Ivan Rododendro > > > Le 23 nov. 2022 à 18:01, Claus Ibsen <claus.ib...@gmail.com> a écrit : > > > > Hi > > > > The resequencer EIP is async, so its output is processed by another > thread, > > which manual commit from kafka is not supported as you must do that on > the > > same thread that received the message. > > > >> On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro < > ivan.rododen...@gmail.com> > >> wrote: > >> > >> Fair enough. > >> > >> I pushed a reproducer here based on testcontainer ... *but it don't > >> reproduces the issue *: > >> https://github.com/ivanrododendro/reprex-camelmanualcommit.git > >> > >> It turns out that my issue is caused by a header based resequencer in > >> the route, basically having this in the route : > >> .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100) > >> produces this : > >> CaughtExceptionType: java.util.ConcurrentModificationException > >> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded > >> > >> The resequencer is in the reproducer too but it does interfere with > kafka > >> manual commit. > >> > >> Nevertheless I'd like to understand the following logs from the > reproducer > >> : > >> INFO route-1.log - Message received > >> INFO route-2.log - Message received > >> INFO o.apache.camel.builder.RouteBuilder.process - Processing message > from > >> route [route-1] > >> INFO o.i.r.c.TemplateBuilder.process - Committed Kafka offset from > route > >> [route-2] > >> > >> Two routes from the same template, with a filter based on message > headers. > >> Route 1 processes the message, route 2 discards it but it triggers > >> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) .. > >> it looks weird to me. > >> > >> Regards > >> > >> > >> > >> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske < > angusyo...@gmail.com > >>> > >> wrote: > >> > >>> Hi, > >>> > >>> Thanks. I have to be honest with you: I truly want to look more closely > >> at > >>> this one, but it's been a bit hard to try to make sense of the code you > >>> provided so far. It's not something I can quickly modify one of our > unit > >>> tests and run. > >>> > >>> In this case ... > >>> > >>> Please, can you provide a full reproducer and send the code? Please put > >> it > >>> on Github, so I can clone and reproduce and debug? That would make > >>> investigating and fixing this much easier and quicker for me. > >>> > >>> Kind regards > >>> > >>> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro < > >>> ivan.rododen...@gmail.com> > >>> wrote: > >>> > >>>> Hi Octavio > >>>> I've been busy ... > >>>> > >>>> I upgraded to Came 3.18.3 : > >>>> 22-11-2022 12:35:10.829 [restartedMain] INFO > >>>> o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel > >> 3.18.3 > >>>> (camel-1) is starting > >>>> > >>>> Still I have the error : > >>>> CaughtExceptionType: java.util.ConcurrentModificationException > >>>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded > >>>> access StackTrace: java.util.ConcurrentModificationException: > >>>> KafkaConsumer is not safe for multi-threaded access > >>>> at > >>>> > >>>> > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31) > >>>> at > >>>> > >>>> > >>> > >> > fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61) > >>>> at org.apache.camel.processor.Pipeline.process(Pipeline.java:182) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560) > >>>> at > >>>> > >>>> > >>> > >> > org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483) > >>>> > >>>> Ivan > >>>> > >>>> On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske < > >>> angusyo...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi, thanks > >>>>> > >>>>> Please, can you try with Camel 3.18.3? Along with the rest of the > >>>>> community, we introduced a lot of fixes on the Kafka component on > >>> 3.18.2 > >>>>> and 3.18.3. Maybe it will help you solve the problem (and, at the > >> same > >>>>> time, it's an LTS version, so we can fix it if there's a problem). > >>>>> > >>>>> Thanks in advance > >>>>> > >>>>> On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro < > >>>> ivan.rododen...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Otavio, > >>>>>> Camel version is 3.17.0 > >>>>>> > >>>>>> thank you > >>>>>> > >>>>>> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske < > >>>>> angusyo...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Ivan, > >>>>>>> > >>>>>>> For Kafka (consumer) objects, yes, they should be created for > >> every > >>>>>> route. > >>>>>>> I want to try to take a look at it during this week or in the > >> next. > >>>>>>> > >>>>>>> In the meantime, can you please tell me which version of Camel > >> you > >>>> are > >>>>>>> using? > >>>>>>> > >>>>>>> Kind regards > >>>>>>> > >>>>>>> On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com> > >>>>> wrote: > >>>>>>> > >>>>>>>> Hi Oktavio thank you for your response. > >>>>>>>> > >>>>>>>> The commit is made synchronously by the kafkaOffsetProcessor > >>> (whose > >>>>>> code > >>>>>>> I > >>>>>>>> forgot to attach, more details here > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception > >>>>>>> ). > >>>>>>>> Basically it does what the documentation says for synchronous > >>>>> commits. > >>>>>>>> > >>>>>>>> Multiple threads cannot access the same kafka client that’s it, > >>> but > >>>>>> camel > >>>>>>>> handles instances and threads, so not so easy to fix for me. > >>>>>>>> > >>>>>>>> I think about seda because I know that for seda endpoint object > >>> are > >>>>>>> pooled > >>>>>>>> (if my understanding is right). > >>>>>>>> > >>>>>>>> Are you sure that new objects are created for every route > >> created > >>>>> from > >>>>>> a > >>>>>>>> template ? > >>>>>>>> > >>>>>>>> Ivan Rododendro > >>>>>>>> > >>>>>>>>> Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske < > >>>>> angusyo...@gmail.com> > >>>>>> a > >>>>>>>> écrit : > >>>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> From the code you provided, it's not very clear to me when > >> and > >>>>> where > >>>>>>> you > >>>>>>>>> are calling the commit. Also it's not very clear to me: which > >>>>> version > >>>>>>> of > >>>>>>>>> Camel you are using and which kind of commit factory you are > >>>> using > >>>>>>> (async > >>>>>>>>> [1] or sync [2]?). > >>>>>>>>> > >>>>>>>>> That said ...The problem here is that - as explained in the > >>>>> exception > >>>>>>>>> message - the Kafka client cannot be accessed from a > >> different > >>>>>> thread. > >>>>>>>>> > >>>>>>>>> So, I am not entirely sure that the problem is related to > >> seda > >>> or > >>>>>>>> something > >>>>>>>>> like that. Also, Camel will indeed, create a different > >> consumer > >>>> for > >>>>>>> every > >>>>>>>>> route. > >>>>>>>>> > >>>>>>>>> Please, can you provide a bit more details about the code you > >>>> have? > >>>>>>>>> > >>>>>>>>> 1. > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53 > >>>>>>>>> 2. > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53 > >>>>>>>>> > >>>>>>>>>> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro < > >>>>>>>> ivan.rododen...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hello > >>>>>>>>>> I'm really new to Camel concepts, our need is to create some > >>>>>> identical > >>>>>>>>>> routes, identical except for some parameters, from a Kafka > >>> topic > >>>>> to > >>>>>> a > >>>>>>>> http > >>>>>>>>>> endpoint, with some processing in-between. > >>>>>>>>>> > >>>>>>>>>> Besides this we want to explicitly commit the message > >>>> consumption > >>>>>> only > >>>>>>>> when > >>>>>>>>>> the http endpoint has been successfully called. > >>>>>>>>>> > >>>>>>>>>> In order to achieve this we set up a route template that > >>> carries > >>>>> the > >>>>>>>> Route > >>>>>>>>>> parameterization and set it up to manually commit after > >> having > >>>>>> called > >>>>>>>> the > >>>>>>>>>> http endpoint : > >>>>>>>>>> public void configure() throws Exception { > >>>>>>>>>> // @formatter:off > >>>>>>>>>> routeTemplate(Constantes.KAFKA_GENERIC_ROUTE) > >>>>>>>>>> .templateParameter(Constantes.JOB_NAME) > >>>>>>>>>> .templateParameter(Constantes.TOPIC) > >>>>>>>>>> .templateParameter(Constantes.PUBLISHER_ID) > >>>>>>>>>> > >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > >>>>>>>>>> .templateParameter(Constantes.JOB_NAME_PARAMETER) > >>>>>>>>>> > >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > >>>>>>>>>> .from(getKafkaEndpoint()) > >>>>>>>>>> .messageHistory() > >>>>>>>>>> .filter(simple("${header.publisherId} == > >>>>>>> '{{publisherId}}'")) > >>>>>>>>>> .process(messageLoggerProcessor) > >>>>>>>>>> .process(modelMapperProcessor) > >>>>>>>>>> .process(jsonlToArrayProcessor) > >>>>>>>>>> .process(payloadProcessor) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>>> > >>> > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount) > >>>>>>>>>> .setHeader(Exchange.HTTP_METHOD, simple("POST")) > >>>>>>>>>> .setHeader(Exchange.CONTENT_TYPE, > >>>>>>>>>> constant("application/json;charset=UTF-8")) > >>>>>>>>>> > >>>>>> .setHeader(Constantes.ACCEPT,constant("application/json")) > >>>>>>>>>> .setHeader(Constantes.API_KEY, constant(apiKey)) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true) > >>>>>>>>>> .process(apiConsumerProcessorLogger) > >>>>>>>>>> .to(this.url) > >>>>>>>>>> .process(kafkaOffsetProcessor); > >>>>>>>>>> // @formatter:on > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> private String getKafkaEndpoint() { > >>>>>>>>>> String endpoint = > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" > >>>>>>> + > >>>>>>>>>> kafkaBrokers; > >>>>>>>>>> > >>>>>>>>>> if (securityEnabled()) { > >>>>>>>>>> endpoint += "&securityProtocol=SASL_SSL" + > >>>>>>>>>> "&saslMechanism=PLAIN" > >>>>>>>>>> + > >>>>>>>>>> > >>>>>>> > >>>>> > >>> > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule > >>>>>>>>>> required username=\"" > >>>>>>>>>> + username + "\" password=\"" + password > >> + > >>>>> "\";" > >>>>>> + > >>>>>>>>>> "&sslTruststoreLocation=" + sslTrustStoreLocation > >>>>>>>>>> + "&sslTruststorePassword=" + > >>>>>>> sslTruststorePassword; > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> return endpoint; > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> The problem is that we systematically get this error when a > >>>>> message > >>>>>> is > >>>>>>>>>> consumed by a route : > >>>>>>>>>> > >>>>>>>>>> Trace: java.util.ConcurrentModificationException: > >>> KafkaConsumer > >>>> is > >>>>>> not > >>>>>>>>>> safe for multi-threaded access > >>>>>>>>>> at > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > >>>>>>>>>> at > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > >>>>>>>>>> at > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255) > >>>>>>>>>> at > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60) > >>>>>>>>>> at > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> My understanding is that the instance of KafkaConsumer is > >>> reused > >>>>> in > >>>>>>>>>> multiple routes and therefore it generates the error, but it > >>>> could > >>>>>> be > >>>>>>>> also > >>>>>>>>>> related to using SEDA endpoint as stated here ( > >>>>>>>>>> https://issues.apache.org/jira/browse/CAMEL-12722), which > >> we > >>>>> don't > >>>>>>>>>> explicitly do. > >>>>>>>>>> > >>>>>>>>>> We tried injecting a KafkaComponent local bean in the route > >> : > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic", > >>>>>>>>>> "{{" + Constantes.TOPIC > >> +"}}").properties(kafkaConfiguration) > >>>>>>>>>> .end() > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration", > >>>>>>>>>> "#{{myKafkaConfiguration}}") > >>>>>>>>>> .end() > >>>>>>>>>> .from("#{{myKafka}}") > >>>>>>>>>> > >>>>>>>>>> But it ends up with another error because you cannot > >> consume a > >>>>> Bean > >>>>>>>>>> endpoint ( > >>>>>>>> https://camel.apache.org/components/3.18.x/bean-component.html > >> ) > >>>>>>>>>> > >>>>>>>>>> How to use a different KafkaConsumer for every created > >> route ? > >>>> Or, > >>>>>> if > >>>>>>>> the > >>>>>>>>>> issue is SEDA related, how to make this route a direct > >> route? > >>>>>>>>>> > >>>>>>>>>> Thank you for your help > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Otavio R. Piske > >>>>>>>>> http://orpiske.net > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Otavio R. Piske > >>>>>>> http://orpiske.net > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Otavio R. Piske > >>>>> http://orpiske.net > >>>>> > >>>> > >>> > >>> > >>> -- > >>> Otavio R. Piske > >>> http://orpiske.net > >>> > >> > > > > > > -- > > Claus Ibsen > > ----------------- > > @davsclaus > > Camel in Action 2: https://www.manning.com/ibsen2 > -- Otavio R. Piske http://orpiske.net