Thanks for the reproducer, this makes it much easier to understand. And
thanks Claus for pointing out the async nature of the resequencer.

I am going to add a note on the documentation about the limitations of
using manual commit along with operations that may run on separate threads.

Kind regards

On Wed, Nov 23, 2022 at 10:35 PM Ivan <ivan.rododen...@gmail.com> wrote:

> Ok thanks.
>
> Ivan Rododendro
>
> > Le 23 nov. 2022 à 18:01, Claus Ibsen <claus.ib...@gmail.com> a écrit :
> >
> > Hi
> >
> > The resequencer EIP is async, so its output is processed by another
> thread,
> > which manual commit from kafka is not supported as you must do that on
> the
> > same thread that received the message.
> >
> >> On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro <
> ivan.rododen...@gmail.com>
> >> wrote:
> >>
> >> Fair enough.
> >>
> >> I pushed a reproducer here based on testcontainer ...  *but it don't
> >> reproduces the issue *:
> >> https://github.com/ivanrododendro/reprex-camelmanualcommit.git
> >>
> >> It turns out that my issue is caused by a header based resequencer in
> >> the route, basically having this in the route :
> >> .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
> >> produces this :
> >> CaughtExceptionType: java.util.ConcurrentModificationException
> >> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> >>
> >> The resequencer is in the reproducer too but it does interfere with
> kafka
> >> manual commit.
> >>
> >> Nevertheless I'd like to understand  the following logs from the
> reproducer
> >> :
> >> INFO  route-1.log - Message received
> >> INFO  route-2.log - Message received
> >> INFO  o.apache.camel.builder.RouteBuilder.process - Processing message
> from
> >> route [route-1]
> >> INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from
> route
> >> [route-2]
> >>
> >> Two routes from the same template, with a filter based on message
> headers.
> >> Route 1 processes the message, route 2 discards it but it triggers
> >> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
> >> it looks weird to me.
> >>
> >> Regards
> >>
> >>
> >>
> >> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <
> angusyo...@gmail.com
> >>>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks. I have to be honest with you: I truly want to look more closely
> >> at
> >>> this one, but it's been a bit hard to try to make sense of the code you
> >>> provided so far. It's not something I can quickly modify one of our
> unit
> >>> tests and run.
> >>>
> >>> In this case ...
> >>>
> >>> Please, can you provide a full reproducer and send the code? Please put
> >> it
> >>> on Github, so I can clone and reproduce and debug? That would make
> >>> investigating and fixing this much easier and quicker for me.
> >>>
> >>> Kind regards
> >>>
> >>> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
> >>> ivan.rododen...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Octavio
> >>>> I've been  busy ...
> >>>>
> >>>> I upgraded to Came 3.18.3 :
> >>>> 22-11-2022 12:35:10.829 [restartedMain] INFO
> >>>> o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel
> >> 3.18.3
> >>>> (camel-1) is starting
> >>>>
> >>>> Still I have the error :
> >>>> CaughtExceptionType: java.util.ConcurrentModificationException
> >>>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> >>>> access  StackTrace: java.util.ConcurrentModificationException:
> >>>> KafkaConsumer is not safe for multi-threaded access
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
> >>>> at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
> >>>> at
> >>>>
> >>>>
> >>>
> >>
> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
> >>>>
> >>>> Ivan
> >>>>
> >>>> On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
> >>> angusyo...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi, thanks
> >>>>>
> >>>>> Please, can you try with Camel 3.18.3? Along with the rest of the
> >>>>> community, we introduced a lot of fixes on the Kafka component on
> >>> 3.18.2
> >>>>> and 3.18.3. Maybe it will help you solve the problem (and, at the
> >> same
> >>>>> time, it's an LTS version, so we can fix it if there's a problem).
> >>>>>
> >>>>> Thanks in advance
> >>>>>
> >>>>> On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
> >>>> ivan.rododen...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Otavio,
> >>>>>> Camel version is 3.17.0
> >>>>>>
> >>>>>> thank you
> >>>>>>
> >>>>>> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> >>>>> angusyo...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Ivan,
> >>>>>>>
> >>>>>>> For Kafka (consumer) objects, yes, they should be created for
> >> every
> >>>>>> route.
> >>>>>>> I want to try to take a look at it during this week or in the
> >> next.
> >>>>>>>
> >>>>>>> In the meantime, can you please tell me which version of Camel
> >> you
> >>>> are
> >>>>>>> using?
> >>>>>>>
> >>>>>>> Kind regards
> >>>>>>>
> >>>>>>> On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Oktavio thank you for your response.
> >>>>>>>>
> >>>>>>>> The commit is made synchronously by the kafkaOffsetProcessor
> >>> (whose
> >>>>>> code
> >>>>>>> I
> >>>>>>>> forgot to attach, more details here
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> >>>>>>> ).
> >>>>>>>> Basically it does what the documentation says for synchronous
> >>>>> commits.
> >>>>>>>>
> >>>>>>>> Multiple threads cannot access the same kafka client that’s it,
> >>> but
> >>>>>> camel
> >>>>>>>> handles instances and threads, so not so easy to fix for me.
> >>>>>>>>
> >>>>>>>> I think about seda because I know that for seda endpoint object
> >>> are
> >>>>>>> pooled
> >>>>>>>> (if my understanding is right).
> >>>>>>>>
> >>>>>>>> Are you sure that new objects are created for every route
> >> created
> >>>>> from
> >>>>>> a
> >>>>>>>> template ?
> >>>>>>>>
> >>>>>>>> Ivan Rododendro
> >>>>>>>>
> >>>>>>>>> Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> >>>>> angusyo...@gmail.com>
> >>>>>> a
> >>>>>>>> écrit :
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> From the code you provided, it's not very clear to me when
> >> and
> >>>>> where
> >>>>>>> you
> >>>>>>>>> are calling the commit. Also it's not very clear to me: which
> >>>>> version
> >>>>>>> of
> >>>>>>>>> Camel you are using and which kind of commit factory you are
> >>>> using
> >>>>>>> (async
> >>>>>>>>> [1] or sync [2]?).
> >>>>>>>>>
> >>>>>>>>> That said ...The problem here is that - as explained in the
> >>>>> exception
> >>>>>>>>> message - the Kafka client cannot be accessed from a
> >> different
> >>>>>> thread.
> >>>>>>>>>
> >>>>>>>>> So, I am not entirely sure that the problem is related to
> >> seda
> >>> or
> >>>>>>>> something
> >>>>>>>>> like that. Also, Camel will indeed, create a different
> >> consumer
> >>>> for
> >>>>>>> every
> >>>>>>>>> route.
> >>>>>>>>>
> >>>>>>>>> Please, can you provide a bit more details about the code you
> >>>> have?
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> >>>>>>>>> 2.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> >>>>>>>>>
> >>>>>>>>>> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> >>>>>>>> ivan.rododen...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hello
> >>>>>>>>>> I'm really new to Camel concepts, our need is to create some
> >>>>>> identical
> >>>>>>>>>> routes, identical except for some parameters, from a Kafka
> >>> topic
> >>>>> to
> >>>>>> a
> >>>>>>>> http
> >>>>>>>>>> endpoint, with some processing in-between.
> >>>>>>>>>>
> >>>>>>>>>> Besides this we want to explicitly commit the message
> >>>> consumption
> >>>>>> only
> >>>>>>>> when
> >>>>>>>>>> the http endpoint has been successfully called.
> >>>>>>>>>>
> >>>>>>>>>> In order to achieve this we set up a route template that
> >>> carries
> >>>>> the
> >>>>>>>> Route
> >>>>>>>>>> parameterization and set it up to manually commit after
> >> having
> >>>>>> called
> >>>>>>>> the
> >>>>>>>>>> http endpoint :
> >>>>>>>>>> public void configure() throws Exception {
> >>>>>>>>>>       // @formatter:off
> >>>>>>>>>>       routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> >>>>>>>>>>           .templateParameter(Constantes.JOB_NAME)
> >>>>>>>>>>           .templateParameter(Constantes.TOPIC)
> >>>>>>>>>>           .templateParameter(Constantes.PUBLISHER_ID)
> >>>>>>>>>>
> >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> >>>>>>>>>>           .templateParameter(Constantes.JOB_NAME_PARAMETER)
> >>>>>>>>>>
> >>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> >>>>>>>>>>           .from(getKafkaEndpoint())
> >>>>>>>>>>           .messageHistory()
> >>>>>>>>>>           .filter(simple("${header.publisherId} ==
> >>>>>>> '{{publisherId}}'"))
> >>>>>>>>>>           .process(messageLoggerProcessor)
> >>>>>>>>>>           .process(modelMapperProcessor)
> >>>>>>>>>>           .process(jsonlToArrayProcessor)
> >>>>>>>>>>           .process(payloadProcessor)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> >>>>>>>>>>           .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> >>>>>>>>>>           .setHeader(Exchange.CONTENT_TYPE,
> >>>>>>>>>> constant("application/json;charset=UTF-8"))
> >>>>>>>>>>
> >>>>>> .setHeader(Constantes.ACCEPT,constant("application/json"))
> >>>>>>>>>>           .setHeader(Constantes.API_KEY, constant(apiKey))
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> >>>>>>>>>>           .process(apiConsumerProcessorLogger)
> >>>>>>>>>>           .to(this.url)
> >>>>>>>>>>           .process(kafkaOffsetProcessor);
> >>>>>>>>>>       // @formatter:on
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>>   private String getKafkaEndpoint() {
> >>>>>>>>>>       String endpoint =
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> >>>>>>> +
> >>>>>>>>>> kafkaBrokers;
> >>>>>>>>>>
> >>>>>>>>>>       if (securityEnabled()) {
> >>>>>>>>>>           endpoint += "&securityProtocol=SASL_SSL" +
> >>>>>>>>>> "&saslMechanism=PLAIN"
> >>>>>>>>>>                   +
> >>>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> >>>>>>>>>> required username=\""
> >>>>>>>>>>                   + username + "\" password=\"" + password
> >> +
> >>>>> "\";"
> >>>>>> +
> >>>>>>>>>> "&sslTruststoreLocation=" + sslTrustStoreLocation
> >>>>>>>>>>                   + "&sslTruststorePassword=" +
> >>>>>>> sslTruststorePassword;
> >>>>>>>>>>       }
> >>>>>>>>>>
> >>>>>>>>>>       return endpoint;
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>> The problem is that we systematically get this error when a
> >>>>> message
> >>>>>> is
> >>>>>>>>>> consumed by a route :
> >>>>>>>>>>
> >>>>>>>>>> Trace: java.util.ConcurrentModificationException:
> >>> KafkaConsumer
> >>>> is
> >>>>>> not
> >>>>>>>>>> safe for multi-threaded access
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> >>>>>>>>>>   at
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> My understanding is that the instance of KafkaConsumer is
> >>> reused
> >>>>> in
> >>>>>>>>>> multiple routes and therefore it generates the error, but it
> >>>> could
> >>>>>> be
> >>>>>>>> also
> >>>>>>>>>> related to using SEDA endpoint as stated here (
> >>>>>>>>>> https://issues.apache.org/jira/browse/CAMEL-12722), which
> >> we
> >>>>> don't
> >>>>>>>>>> explicitly do.
> >>>>>>>>>>
> >>>>>>>>>> We tried injecting a KafkaComponent local bean in the route
> >> :
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> >>>>>>>>>> "{{" + Constantes.TOPIC
> >> +"}}").properties(kafkaConfiguration)
> >>>>>>>>>>           .end()
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> >>>>>>>>>> "#{{myKafkaConfiguration}}")
> >>>>>>>>>>           .end()
> >>>>>>>>>>           .from("#{{myKafka}}")
> >>>>>>>>>>
> >>>>>>>>>> But it ends up with another error because you cannot
> >> consume a
> >>>>> Bean
> >>>>>>>>>> endpoint (
> >>>>>>>> https://camel.apache.org/components/3.18.x/bean-component.html
> >> )
> >>>>>>>>>>
> >>>>>>>>>> How to use a different KafkaConsumer for every created
> >> route ?
> >>>> Or,
> >>>>>> if
> >>>>>>>> the
> >>>>>>>>>> issue is SEDA related, how to make this route a direct
> >> route?
> >>>>>>>>>>
> >>>>>>>>>> Thank you for your help
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Otavio R. Piske
> >>>>>>>>> http://orpiske.net
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Otavio R. Piske
> >>>>>>> http://orpiske.net
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Otavio R. Piske
> >>>>> http://orpiske.net
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Otavio R. Piske
> >>> http://orpiske.net
> >>>
> >>
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > @davsclaus
> > Camel in Action 2: https://www.manning.com/ibsen2
>


-- 
Otavio R. Piske
http://orpiske.net

Reply via email to