Fair enough. I pushed a reproducer here based on testcontainer ... *but it don't reproduces the issue *: https://github.com/ivanrododendro/reprex-camelmanualcommit.git
It turns out that my issue is caused by a header based resequencer in the route, basically having this in the route : .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100) produces this : CaughtExceptionType: java.util.ConcurrentModificationException CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded The resequencer is in the reproducer too but it does interfere with kafka manual commit. Nevertheless I'd like to understand the following logs from the reproducer : INFO route-1.log - Message received INFO route-2.log - Message received INFO o.apache.camel.builder.RouteBuilder.process - Processing message from route [route-1] INFO o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route [route-2] Two routes from the same template, with a filter based on message headers. Route 1 processes the message, route 2 discards it but it triggers .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) .. it looks weird to me. Regards On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <angusyo...@gmail.com> wrote: > Hi, > > Thanks. I have to be honest with you: I truly want to look more closely at > this one, but it's been a bit hard to try to make sense of the code you > provided so far. It's not something I can quickly modify one of our unit > tests and run. > > In this case ... > > Please, can you provide a full reproducer and send the code? Please put it > on Github, so I can clone and reproduce and debug? That would make > investigating and fixing this much easier and quicker for me. > > Kind regards > > On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro < > ivan.rododen...@gmail.com> > wrote: > > > Hi Octavio > > I've been busy ... > > > > I upgraded to Came 3.18.3 : > > 22-11-2022 12:35:10.829 [restartedMain] INFO > > o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel 3.18.3 > > (camel-1) is starting > > > > Still I have the error : > > CaughtExceptionType: java.util.ConcurrentModificationException > > CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded > > access StackTrace: java.util.ConcurrentModificationException: > > KafkaConsumer is not safe for multi-threaded access > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450) > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434) > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491) > > at > > > > > org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89) > > at > > > > > org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31) > > at > > > > > fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18) > > at > > > > > org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) > > at > > > > > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818) > > at > > > > > org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726) > > at > > > > > org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189) > > at > > > > > org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61) > > at org.apache.camel.processor.Pipeline.process(Pipeline.java:182) > > at > > > > > org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) > > at > > > > > org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320) > > at > > > > > org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560) > > at > > > > > org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483) > > > > Ivan > > > > On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske < > angusyo...@gmail.com> > > wrote: > > > > > Hi, thanks > > > > > > Please, can you try with Camel 3.18.3? Along with the rest of the > > > community, we introduced a lot of fixes on the Kafka component on > 3.18.2 > > > and 3.18.3. Maybe it will help you solve the problem (and, at the same > > > time, it's an LTS version, so we can fix it if there's a problem). > > > > > > Thanks in advance > > > > > > On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro < > > ivan.rododen...@gmail.com> > > > wrote: > > > > > > > Hi Otavio, > > > > Camel version is 3.17.0 > > > > > > > > thank you > > > > > > > > On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske < > > > angusyo...@gmail.com> > > > > wrote: > > > > > > > > > Hi Ivan, > > > > > > > > > > For Kafka (consumer) objects, yes, they should be created for every > > > > route. > > > > > I want to try to take a look at it during this week or in the next. > > > > > > > > > > In the meantime, can you please tell me which version of Camel you > > are > > > > > using? > > > > > > > > > > Kind regards > > > > > > > > > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com> > > > wrote: > > > > > > > > > > > Hi Oktavio thank you for your response. > > > > > > > > > > > > The commit is made synchronously by the kafkaOffsetProcessor > (whose > > > > code > > > > > I > > > > > > forgot to attach, more details here > > > > > > > > > > > > > > > > > > > > > https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception > > > > > ). > > > > > > Basically it does what the documentation says for synchronous > > > commits. > > > > > > > > > > > > Multiple threads cannot access the same kafka client that’s it, > but > > > > camel > > > > > > handles instances and threads, so not so easy to fix for me. > > > > > > > > > > > > I think about seda because I know that for seda endpoint object > are > > > > > pooled > > > > > > (if my understanding is right). > > > > > > > > > > > > Are you sure that new objects are created for every route created > > > from > > > > a > > > > > > template ? > > > > > > > > > > > > Ivan Rododendro > > > > > > > > > > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske < > > > angusyo...@gmail.com> > > > > a > > > > > > écrit : > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > From the code you provided, it's not very clear to me when and > > > where > > > > > you > > > > > > > are calling the commit. Also it's not very clear to me: which > > > version > > > > > of > > > > > > > Camel you are using and which kind of commit factory you are > > using > > > > > (async > > > > > > > [1] or sync [2]?). > > > > > > > > > > > > > > That said ...The problem here is that - as explained in the > > > exception > > > > > > > message - the Kafka client cannot be accessed from a different > > > > thread. > > > > > > > > > > > > > > So, I am not entirely sure that the problem is related to seda > or > > > > > > something > > > > > > > like that. Also, Camel will indeed, create a different consumer > > for > > > > > every > > > > > > > route. > > > > > > > > > > > > > > Please, can you provide a bit more details about the code you > > have? > > > > > > > > > > > > > > 1. > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53 > > > > > > > 2. > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53 > > > > > > > > > > > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro < > > > > > > ivan.rododen...@gmail.com> > > > > > > >> wrote: > > > > > > >> > > > > > > >> Hello > > > > > > >> I'm really new to Camel concepts, our need is to create some > > > > identical > > > > > > >> routes, identical except for some parameters, from a Kafka > topic > > > to > > > > a > > > > > > http > > > > > > >> endpoint, with some processing in-between. > > > > > > >> > > > > > > >> Besides this we want to explicitly commit the message > > consumption > > > > only > > > > > > when > > > > > > >> the http endpoint has been successfully called. > > > > > > >> > > > > > > >> In order to achieve this we set up a route template that > carries > > > the > > > > > > Route > > > > > > >> parameterization and set it up to manually commit after having > > > > called > > > > > > the > > > > > > >> http endpoint : > > > > > > >> public void configure() throws Exception { > > > > > > >> // @formatter:off > > > > > > >> routeTemplate(Constantes.KAFKA_GENERIC_ROUTE) > > > > > > >> .templateParameter(Constantes.JOB_NAME) > > > > > > >> .templateParameter(Constantes.TOPIC) > > > > > > >> .templateParameter(Constantes.PUBLISHER_ID) > > > > > > >> > > .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > > > > > > >> .templateParameter(Constantes.JOB_NAME_PARAMETER) > > > > > > >> > > .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > > > > > > >> .from(getKafkaEndpoint()) > > > > > > >> .messageHistory() > > > > > > >> .filter(simple("${header.publisherId} == > > > > > '{{publisherId}}'")) > > > > > > >> .process(messageLoggerProcessor) > > > > > > >> .process(modelMapperProcessor) > > > > > > >> .process(jsonlToArrayProcessor) > > > > > > >> .process(payloadProcessor) > > > > > > >> > > > > > > >> > > > > > > > > > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount) > > > > > > >> .setHeader(Exchange.HTTP_METHOD, simple("POST")) > > > > > > >> .setHeader(Exchange.CONTENT_TYPE, > > > > > > >> constant("application/json;charset=UTF-8")) > > > > > > >> > > > > .setHeader(Constantes.ACCEPT,constant("application/json")) > > > > > > >> .setHeader(Constantes.API_KEY, constant(apiKey)) > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true) > > > > > > >> .process(apiConsumerProcessorLogger) > > > > > > >> .to(this.url) > > > > > > >> .process(kafkaOffsetProcessor); > > > > > > >> // @formatter:on > > > > > > >> } > > > > > > >> > > > > > > >> private String getKafkaEndpoint() { > > > > > > >> String endpoint = > > > > > > >> > > > > > > > > > > > > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" > > > > > + > > > > > > >> kafkaBrokers; > > > > > > >> > > > > > > >> if (securityEnabled()) { > > > > > > >> endpoint += "&securityProtocol=SASL_SSL" + > > > > > > >> "&saslMechanism=PLAIN" > > > > > > >> + > > > > > > >> > > > > > > > > > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule > > > > > > >> required username=\"" > > > > > > >> + username + "\" password=\"" + password + > > > "\";" > > > > + > > > > > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation > > > > > > >> + "&sslTruststorePassword=" + > > > > > sslTruststorePassword; > > > > > > >> } > > > > > > >> > > > > > > >> return endpoint; > > > > > > >> } > > > > > > >> > > > > > > >> The problem is that we systematically get this error when a > > > message > > > > is > > > > > > >> consumed by a route : > > > > > > >> > > > > > > >> Trace: java.util.ConcurrentModificationException: > KafkaConsumer > > is > > > > not > > > > > > >> safe for multi-threaded access > > > > > > >> at > > > > > > >> > > > > > > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > > > > > > >> at > > > > > > >> > > > > > > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > > > > > > >> at > > > > > > >> > > > > > > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255) > > > > > > >> at > > > > > > >> > > > > > > > > > > > > > > > > > > > > > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60) > > > > > > >> at > > > > > > >> > > > > > > > > > > > > > > > > > > > > > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51) > > > > > > >> > > > > > > >> > > > > > > >> My understanding is that the instance of KafkaConsumer is > reused > > > in > > > > > > >> multiple routes and therefore it generates the error, but it > > could > > > > be > > > > > > also > > > > > > >> related to using SEDA endpoint as stated here ( > > > > > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we > > > don't > > > > > > >> explicitly do. > > > > > > >> > > > > > > >> We tried injecting a KafkaComponent local bean in the route : > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic", > > > > > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration) > > > > > > >> .end() > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration", > > > > > > >> "#{{myKafkaConfiguration}}") > > > > > > >> .end() > > > > > > >> .from("#{{myKafka}}") > > > > > > >> > > > > > > >> But it ends up with another error because you cannot consume a > > > Bean > > > > > > >> endpoint ( > > > > > > https://camel.apache.org/components/3.18.x/bean-component.html) > > > > > > >> > > > > > > >> How to use a different KafkaConsumer for every created route ? > > Or, > > > > if > > > > > > the > > > > > > >> issue is SEDA related, how to make this route a direct route? > > > > > > >> > > > > > > >> Thank you for your help > > > > > > >> > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Otavio R. Piske > > > > > > > http://orpiske.net > > > > > > > > > > > > > > > > > > > > > -- > > > > > Otavio R. Piske > > > > > http://orpiske.net > > > > > > > > > > > > > > > > > > -- > > > Otavio R. Piske > > > http://orpiske.net > > > > > > > > -- > Otavio R. Piske > http://orpiske.net >