Ok thanks. Ivan Rododendro
> Le 23 nov. 2022 à 18:01, Claus Ibsen <claus.ib...@gmail.com> a écrit : > > Hi > > The resequencer EIP is async, so its output is processed by another thread, > which manual commit from kafka is not supported as you must do that on the > same thread that received the message. > >> On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro <ivan.rododen...@gmail.com> >> wrote: >> >> Fair enough. >> >> I pushed a reproducer here based on testcontainer ... *but it don't >> reproduces the issue *: >> https://github.com/ivanrododendro/reprex-camelmanualcommit.git >> >> It turns out that my issue is caused by a header based resequencer in >> the route, basically having this in the route : >> .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100) >> produces this : >> CaughtExceptionType: java.util.ConcurrentModificationException >> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded >> >> The resequencer is in the reproducer too but it does interfere with kafka >> manual commit. >> >> Nevertheless I'd like to understand the following logs from the reproducer >> : >> INFO route-1.log - Message received >> INFO route-2.log - Message received >> INFO o.apache.camel.builder.RouteBuilder.process - Processing message from >> route [route-1] >> INFO o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route >> [route-2] >> >> Two routes from the same template, with a filter based on message headers. >> Route 1 processes the message, route 2 discards it but it triggers >> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) .. >> it looks weird to me. >> >> Regards >> >> >> >> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <angusyo...@gmail.com >>> >> wrote: >> >>> Hi, >>> >>> Thanks. I have to be honest with you: I truly want to look more closely >> at >>> this one, but it's been a bit hard to try to make sense of the code you >>> provided so far. It's not something I can quickly modify one of our unit >>> tests and run. >>> >>> In this case ... >>> >>> Please, can you provide a full reproducer and send the code? Please put >> it >>> on Github, so I can clone and reproduce and debug? That would make >>> investigating and fixing this much easier and quicker for me. >>> >>> Kind regards >>> >>> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro < >>> ivan.rododen...@gmail.com> >>> wrote: >>> >>>> Hi Octavio >>>> I've been busy ... >>>> >>>> I upgraded to Came 3.18.3 : >>>> 22-11-2022 12:35:10.829 [restartedMain] INFO >>>> o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel >> 3.18.3 >>>> (camel-1) is starting >>>> >>>> Still I have the error : >>>> CaughtExceptionType: java.util.ConcurrentModificationException >>>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded >>>> access StackTrace: java.util.ConcurrentModificationException: >>>> KafkaConsumer is not safe for multi-threaded access >>>> at >>>> >>>> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450) >>>> at >>>> >>>> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434) >>>> at >>>> >>>> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491) >>>> at >>>> >>>> >>> >> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89) >>>> at >>>> >>>> >>> >> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31) >>>> at >>>> >>>> >>> >> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18) >>>> at >>>> >>>> >>> >> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) >>>> at >>>> >>>> >>> >> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818) >>>> at >>>> >>>> >>> >> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726) >>>> at >>>> >>>> >>> >> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189) >>>> at >>>> >>>> >>> >> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61) >>>> at org.apache.camel.processor.Pipeline.process(Pipeline.java:182) >>>> at >>>> >>>> >>> >> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) >>>> at >>>> >>>> >>> >> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320) >>>> at >>>> >>>> >>> >> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560) >>>> at >>>> >>>> >>> >> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483) >>>> >>>> Ivan >>>> >>>> On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske < >>> angusyo...@gmail.com> >>>> wrote: >>>> >>>>> Hi, thanks >>>>> >>>>> Please, can you try with Camel 3.18.3? Along with the rest of the >>>>> community, we introduced a lot of fixes on the Kafka component on >>> 3.18.2 >>>>> and 3.18.3. Maybe it will help you solve the problem (and, at the >> same >>>>> time, it's an LTS version, so we can fix it if there's a problem). >>>>> >>>>> Thanks in advance >>>>> >>>>> On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro < >>>> ivan.rododen...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Otavio, >>>>>> Camel version is 3.17.0 >>>>>> >>>>>> thank you >>>>>> >>>>>> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske < >>>>> angusyo...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Ivan, >>>>>>> >>>>>>> For Kafka (consumer) objects, yes, they should be created for >> every >>>>>> route. >>>>>>> I want to try to take a look at it during this week or in the >> next. >>>>>>> >>>>>>> In the meantime, can you please tell me which version of Camel >> you >>>> are >>>>>>> using? >>>>>>> >>>>>>> Kind regards >>>>>>> >>>>>>> On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com> >>>>> wrote: >>>>>>> >>>>>>>> Hi Oktavio thank you for your response. >>>>>>>> >>>>>>>> The commit is made synchronously by the kafkaOffsetProcessor >>> (whose >>>>>> code >>>>>>> I >>>>>>>> forgot to attach, more details here >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception >>>>>>> ). >>>>>>>> Basically it does what the documentation says for synchronous >>>>> commits. >>>>>>>> >>>>>>>> Multiple threads cannot access the same kafka client that’s it, >>> but >>>>>> camel >>>>>>>> handles instances and threads, so not so easy to fix for me. >>>>>>>> >>>>>>>> I think about seda because I know that for seda endpoint object >>> are >>>>>>> pooled >>>>>>>> (if my understanding is right). >>>>>>>> >>>>>>>> Are you sure that new objects are created for every route >> created >>>>> from >>>>>> a >>>>>>>> template ? >>>>>>>> >>>>>>>> Ivan Rododendro >>>>>>>> >>>>>>>>> Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske < >>>>> angusyo...@gmail.com> >>>>>> a >>>>>>>> écrit : >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> From the code you provided, it's not very clear to me when >> and >>>>> where >>>>>>> you >>>>>>>>> are calling the commit. Also it's not very clear to me: which >>>>> version >>>>>>> of >>>>>>>>> Camel you are using and which kind of commit factory you are >>>> using >>>>>>> (async >>>>>>>>> [1] or sync [2]?). >>>>>>>>> >>>>>>>>> That said ...The problem here is that - as explained in the >>>>> exception >>>>>>>>> message - the Kafka client cannot be accessed from a >> different >>>>>> thread. >>>>>>>>> >>>>>>>>> So, I am not entirely sure that the problem is related to >> seda >>> or >>>>>>>> something >>>>>>>>> like that. Also, Camel will indeed, create a different >> consumer >>>> for >>>>>>> every >>>>>>>>> route. >>>>>>>>> >>>>>>>>> Please, can you provide a bit more details about the code you >>>> have? >>>>>>>>> >>>>>>>>> 1. >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53 >>>>>>>>> 2. >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53 >>>>>>>>> >>>>>>>>>> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro < >>>>>>>> ivan.rododen...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hello >>>>>>>>>> I'm really new to Camel concepts, our need is to create some >>>>>> identical >>>>>>>>>> routes, identical except for some parameters, from a Kafka >>> topic >>>>> to >>>>>> a >>>>>>>> http >>>>>>>>>> endpoint, with some processing in-between. >>>>>>>>>> >>>>>>>>>> Besides this we want to explicitly commit the message >>>> consumption >>>>>> only >>>>>>>> when >>>>>>>>>> the http endpoint has been successfully called. >>>>>>>>>> >>>>>>>>>> In order to achieve this we set up a route template that >>> carries >>>>> the >>>>>>>> Route >>>>>>>>>> parameterization and set it up to manually commit after >> having >>>>>> called >>>>>>>> the >>>>>>>>>> http endpoint : >>>>>>>>>> public void configure() throws Exception { >>>>>>>>>> // @formatter:off >>>>>>>>>> routeTemplate(Constantes.KAFKA_GENERIC_ROUTE) >>>>>>>>>> .templateParameter(Constantes.JOB_NAME) >>>>>>>>>> .templateParameter(Constantes.TOPIC) >>>>>>>>>> .templateParameter(Constantes.PUBLISHER_ID) >>>>>>>>>> >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER) >>>>>>>>>> .templateParameter(Constantes.JOB_NAME_PARAMETER) >>>>>>>>>> >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER) >>>>>>>>>> .from(getKafkaEndpoint()) >>>>>>>>>> .messageHistory() >>>>>>>>>> .filter(simple("${header.publisherId} == >>>>>>> '{{publisherId}}'")) >>>>>>>>>> .process(messageLoggerProcessor) >>>>>>>>>> .process(modelMapperProcessor) >>>>>>>>>> .process(jsonlToArrayProcessor) >>>>>>>>>> .process(payloadProcessor) >>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>> >>> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount) >>>>>>>>>> .setHeader(Exchange.HTTP_METHOD, simple("POST")) >>>>>>>>>> .setHeader(Exchange.CONTENT_TYPE, >>>>>>>>>> constant("application/json;charset=UTF-8")) >>>>>>>>>> >>>>>> .setHeader(Constantes.ACCEPT,constant("application/json")) >>>>>>>>>> .setHeader(Constantes.API_KEY, constant(apiKey)) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true) >>>>>>>>>> .process(apiConsumerProcessorLogger) >>>>>>>>>> .to(this.url) >>>>>>>>>> .process(kafkaOffsetProcessor); >>>>>>>>>> // @formatter:on >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> private String getKafkaEndpoint() { >>>>>>>>>> String endpoint = >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" >>>>>>> + >>>>>>>>>> kafkaBrokers; >>>>>>>>>> >>>>>>>>>> if (securityEnabled()) { >>>>>>>>>> endpoint += "&securityProtocol=SASL_SSL" + >>>>>>>>>> "&saslMechanism=PLAIN" >>>>>>>>>> + >>>>>>>>>> >>>>>>> >>>>> >>> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule >>>>>>>>>> required username=\"" >>>>>>>>>> + username + "\" password=\"" + password >> + >>>>> "\";" >>>>>> + >>>>>>>>>> "&sslTruststoreLocation=" + sslTrustStoreLocation >>>>>>>>>> + "&sslTruststorePassword=" + >>>>>>> sslTruststorePassword; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> return endpoint; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> The problem is that we systematically get this error when a >>>>> message >>>>>> is >>>>>>>>>> consumed by a route : >>>>>>>>>> >>>>>>>>>> Trace: java.util.ConcurrentModificationException: >>> KafkaConsumer >>>> is >>>>>> not >>>>>>>>>> safe for multi-threaded access >>>>>>>>>> at >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) >>>>>>>>>> at >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) >>>>>>>>>> at >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255) >>>>>>>>>> at >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60) >>>>>>>>>> at >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> My understanding is that the instance of KafkaConsumer is >>> reused >>>>> in >>>>>>>>>> multiple routes and therefore it generates the error, but it >>>> could >>>>>> be >>>>>>>> also >>>>>>>>>> related to using SEDA endpoint as stated here ( >>>>>>>>>> https://issues.apache.org/jira/browse/CAMEL-12722), which >> we >>>>> don't >>>>>>>>>> explicitly do. >>>>>>>>>> >>>>>>>>>> We tried injecting a KafkaComponent local bean in the route >> : >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic", >>>>>>>>>> "{{" + Constantes.TOPIC >> +"}}").properties(kafkaConfiguration) >>>>>>>>>> .end() >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration", >>>>>>>>>> "#{{myKafkaConfiguration}}") >>>>>>>>>> .end() >>>>>>>>>> .from("#{{myKafka}}") >>>>>>>>>> >>>>>>>>>> But it ends up with another error because you cannot >> consume a >>>>> Bean >>>>>>>>>> endpoint ( >>>>>>>> https://camel.apache.org/components/3.18.x/bean-component.html >> ) >>>>>>>>>> >>>>>>>>>> How to use a different KafkaConsumer for every created >> route ? >>>> Or, >>>>>> if >>>>>>>> the >>>>>>>>>> issue is SEDA related, how to make this route a direct >> route? >>>>>>>>>> >>>>>>>>>> Thank you for your help >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Otavio R. Piske >>>>>>>>> http://orpiske.net >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Otavio R. Piske >>>>>>> http://orpiske.net >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Otavio R. Piske >>>>> http://orpiske.net >>>>> >>>> >>> >>> >>> -- >>> Otavio R. Piske >>> http://orpiske.net >>> >> > > > -- > Claus Ibsen > ----------------- > @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2