Hi

The resequencer EIP is async, so its output is processed by another thread,
which manual commit from kafka is not supported as you must do that on the
same thread that received the message.

On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro <[email protected]>
wrote:

> Fair enough.
>
> I pushed a reproducer here based on testcontainer ...  *but it don't
> reproduces the issue *:
> https://github.com/ivanrododendro/reprex-camelmanualcommit.git
>
> It turns out that my issue is caused by a header based resequencer in
> the route, basically having this in the route :
>  .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
> produces this :
> CaughtExceptionType: java.util.ConcurrentModificationException
>  CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
>
> The resequencer is in the reproducer too but it does interfere with kafka
> manual commit.
>
> Nevertheless I'd like to understand  the following logs from the reproducer
> :
> INFO  route-1.log - Message received
> INFO  route-2.log - Message received
> INFO  o.apache.camel.builder.RouteBuilder.process - Processing message from
> route [route-1]
> INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route
> [route-2]
>
> Two routes from the same template, with a filter based on message headers.
> Route 1 processes the message, route 2 discards it but it triggers
> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
> it looks weird to me.
>
> Regards
>
>
>
> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <[email protected]
> >
> wrote:
>
> > Hi,
> >
> > Thanks. I have to be honest with you: I truly want to look more closely
> at
> > this one, but it's been a bit hard to try to make sense of the code you
> > provided so far. It's not something I can quickly modify one of our unit
> > tests and run.
> >
> > In this case ...
> >
> > Please, can you provide a full reproducer and send the code? Please put
> it
> > on Github, so I can clone and reproduce and debug? That would make
> > investigating and fixing this much easier and quicker for me.
> >
> > Kind regards
> >
> > On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
> > [email protected]>
> > wrote:
> >
> > > Hi Octavio
> > > I've been  busy ...
> > >
> > > I upgraded to Came 3.18.3 :
> > > 22-11-2022 12:35:10.829 [restartedMain] INFO
> > >  o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel
> 3.18.3
> > > (camel-1) is starting
> > >
> > > Still I have the error :
> > > CaughtExceptionType: java.util.ConcurrentModificationException
> > >  CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> > > access  StackTrace: java.util.ConcurrentModificationException:
> > > KafkaConsumer is not safe for multi-threaded access
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
> > > at
> > >
> > >
> >
> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
> > > at
> > >
> > >
> >
> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
> > > at
> > >
> > >
> >
> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
> > > at
> > >
> > >
> >
> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
> > > at
> > >
> > >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
> > > at
> > >
> > >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
> > > at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
> > > at
> > >
> > >
> >
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
> > > at
> > >
> > >
> >
> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
> > >
> > > Ivan
> > >
> > > On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
> > [email protected]>
> > > wrote:
> > >
> > > > Hi, thanks
> > > >
> > > > Please, can you try with Camel 3.18.3? Along with the rest of the
> > > > community, we introduced a lot of fixes on the Kafka component on
> > 3.18.2
> > > > and 3.18.3. Maybe it will help you solve the problem (and, at the
> same
> > > > time, it's an LTS version, so we can fix it if there's a problem).
> > > >
> > > > Thanks in advance
> > > >
> > > > On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
> > > [email protected]>
> > > > wrote:
> > > >
> > > > > Hi Otavio,
> > > > > Camel version is 3.17.0
> > > > >
> > > > > thank you
> > > > >
> > > > > On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> > > > [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Hi Ivan,
> > > > > >
> > > > > > For Kafka (consumer) objects, yes, they should be created for
> every
> > > > > route.
> > > > > > I want to try to take a look at it during this week or in the
> next.
> > > > > >
> > > > > > In the meantime, can you please tell me which version of Camel
> you
> > > are
> > > > > > using?
> > > > > >
> > > > > > Kind regards
> > > > > >
> > > > > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <[email protected]>
> > > > wrote:
> > > > > >
> > > > > > > Hi Oktavio thank you for your response.
> > > > > > >
> > > > > > > The commit is made synchronously by the kafkaOffsetProcessor
> > (whose
> > > > > code
> > > > > > I
> > > > > > > forgot to attach, more details here
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > > > > > ).
> > > > > > > Basically it does what the documentation says for synchronous
> > > > commits.
> > > > > > >
> > > > > > > Multiple threads cannot access the same kafka client that’s it,
> > but
> > > > > camel
> > > > > > > handles instances and threads, so not so easy to fix for me.
> > > > > > >
> > > > > > > I think about seda because I know that for seda endpoint object
> > are
> > > > > > pooled
> > > > > > > (if my understanding is right).
> > > > > > >
> > > > > > > Are you sure that new objects are created for every route
> created
> > > > from
> > > > > a
> > > > > > > template ?
> > > > > > >
> > > > > > > Ivan Rododendro
> > > > > > >
> > > > > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> > > > [email protected]>
> > > > > a
> > > > > > > écrit :
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > From the code you provided, it's not very clear to me when
> and
> > > > where
> > > > > > you
> > > > > > > > are calling the commit. Also it's not very clear to me: which
> > > > version
> > > > > > of
> > > > > > > > Camel you are using and which kind of commit factory you are
> > > using
> > > > > > (async
> > > > > > > > [1] or sync [2]?).
> > > > > > > >
> > > > > > > > That said ...The problem here is that - as explained in the
> > > > exception
> > > > > > > > message - the Kafka client cannot be accessed from a
> different
> > > > > thread.
> > > > > > > >
> > > > > > > > So, I am not entirely sure that the problem is related to
> seda
> > or
> > > > > > > something
> > > > > > > > like that. Also, Camel will indeed, create a different
> consumer
> > > for
> > > > > > every
> > > > > > > > route.
> > > > > > > >
> > > > > > > > Please, can you provide a bit more details about the code you
> > > have?
> > > > > > > >
> > > > > > > > 1.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > > > > > 2.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > > > > > >
> > > > > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > > > > > [email protected]>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> Hello
> > > > > > > >> I'm really new to Camel concepts, our need is to create some
> > > > > identical
> > > > > > > >> routes, identical except for some parameters, from a Kafka
> > topic
> > > > to
> > > > > a
> > > > > > > http
> > > > > > > >> endpoint, with some processing in-between.
> > > > > > > >>
> > > > > > > >> Besides this we want to explicitly commit the message
> > > consumption
> > > > > only
> > > > > > > when
> > > > > > > >> the http endpoint has been successfully called.
> > > > > > > >>
> > > > > > > >> In order to achieve this we set up a route template that
> > carries
> > > > the
> > > > > > > Route
> > > > > > > >> parameterization and set it up to manually commit after
> having
> > > > > called
> > > > > > > the
> > > > > > > >> http endpoint :
> > > > > > > >> public void configure() throws Exception {
> > > > > > > >>        // @formatter:off
> > > > > > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > > > > > >>            .templateParameter(Constantes.JOB_NAME)
> > > > > > > >>            .templateParameter(Constantes.TOPIC)
> > > > > > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > > > > > >>
> > > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > > > > > >>
> > > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > > >>            .from(getKafkaEndpoint())
> > > > > > > >>            .messageHistory()
> > > > > > > >>            .filter(simple("${header.publisherId} ==
> > > > > > '{{publisherId}}'"))
> > > > > > > >>            .process(messageLoggerProcessor)
> > > > > > > >>            .process(modelMapperProcessor)
> > > > > > > >>            .process(jsonlToArrayProcessor)
> > > > > > > >>            .process(payloadProcessor)
> > > > > > > >>
> > > > > > > >>
> > > > > >
> > > >
> > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > > > > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > > > > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > > > > > >> constant("application/json;charset=UTF-8"))
> > > > > > > >>
> > > > > .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > > > > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > > > > > >>            .process(apiConsumerProcessorLogger)
> > > > > > > >>            .to(this.url)
> > > > > > > >>            .process(kafkaOffsetProcessor);
> > > > > > > >>        // @formatter:on
> > > > > > > >>    }
> > > > > > > >>
> > > > > > > >>    private String getKafkaEndpoint() {
> > > > > > > >>        String endpoint =
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > > > > > +
> > > > > > > >> kafkaBrokers;
> > > > > > > >>
> > > > > > > >>        if (securityEnabled()) {
> > > > > > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > > > > > >> "&saslMechanism=PLAIN"
> > > > > > > >>                    +
> > > > > > > >>
> > > > > >
> > > >
> > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > > > > > >> required username=\""
> > > > > > > >>                    + username + "\" password=\"" + password
> +
> > > > "\";"
> > > > > +
> > > > > > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > > > > > >>                    + "&sslTruststorePassword=" +
> > > > > > sslTruststorePassword;
> > > > > > > >>        }
> > > > > > > >>
> > > > > > > >>        return endpoint;
> > > > > > > >>    }
> > > > > > > >>
> > > > > > > >> The problem is that we systematically get this error when a
> > > > message
> > > > > is
> > > > > > > >> consumed by a route :
> > > > > > > >>
> > > > > > > >> Trace: java.util.ConcurrentModificationException:
> > KafkaConsumer
> > > is
> > > > > not
> > > > > > > >> safe for multi-threaded access
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > > > > > >>    at
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> My understanding is that the instance of KafkaConsumer is
> > reused
> > > > in
> > > > > > > >> multiple routes and therefore it generates the error, but it
> > > could
> > > > > be
> > > > > > > also
> > > > > > > >> related to using SEDA endpoint as stated here (
> > > > > > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which
> we
> > > > don't
> > > > > > > >> explicitly do.
> > > > > > > >>
> > > > > > > >> We tried injecting a KafkaComponent local bean in the route
> :
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > > > > > >> "{{" + Constantes.TOPIC
> +"}}").properties(kafkaConfiguration)
> > > > > > > >>            .end()
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > > > > > >> "#{{myKafkaConfiguration}}")
> > > > > > > >>            .end()
> > > > > > > >>            .from("#{{myKafka}}")
> > > > > > > >>
> > > > > > > >> But it ends up with another error because you cannot
> consume a
> > > > Bean
> > > > > > > >> endpoint (
> > > > > > > https://camel.apache.org/components/3.18.x/bean-component.html
> )
> > > > > > > >>
> > > > > > > >> How to use a different KafkaConsumer for every created
> route ?
> > > Or,
> > > > > if
> > > > > > > the
> > > > > > > >> issue is SEDA related, how to make this route a direct
> route?
> > > > > > > >>
> > > > > > > >> Thank you for your help
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Otavio R. Piske
> > > > > > > > http://orpiske.net
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Otavio R. Piske
> > > > > > http://orpiske.net
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Otavio R. Piske
> > > > http://orpiske.net
> > > >
> > >
> >
> >
> > --
> > Otavio R. Piske
> > http://orpiske.net
> >
>


-- 
Claus Ibsen
-----------------
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to