Fair enough.

I pushed a reproducer here based on testcontainer ...  *but it don't
reproduces the issue *:
https://github.com/ivanrododendro/reprex-camelmanualcommit.git

It turns out that my issue is caused by a header based resequencer in
the route, basically having this in the route :
 .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
produces this :
CaughtExceptionType: java.util.ConcurrentModificationException
 CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded

The resequencer is in the reproducer too but it does interfere with kafka
manual commit.

Nevertheless I'd like to understand  the following logs from the reproducer
:
INFO  route-1.log - Message received
INFO  route-2.log - Message received
INFO  o.apache.camel.builder.RouteBuilder.process - Processing message from
route [route-1]
INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route
[route-2]

Two routes from the same template, with a filter based on message headers.
Route 1 processes the message, route 2 discards it but it triggers
.onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
it looks weird to me.

Regards



On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <angusyo...@gmail.com>
wrote:

> Hi,
>
> Thanks. I have to be honest with you: I truly want to look more closely at
> this one, but it's been a bit hard to try to make sense of the code you
> provided so far. It's not something I can quickly modify one of our unit
> tests and run.
>
> In this case ...
>
> Please, can you provide a full reproducer and send the code? Please put it
> on Github, so I can clone and reproduce and debug? That would make
> investigating and fixing this much easier and quicker for me.
>
> Kind regards
>
> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
> ivan.rododen...@gmail.com>
> wrote:
>
> > Hi Octavio
> > I've been  busy ...
> >
> > I upgraded to Came 3.18.3 :
> > 22-11-2022 12:35:10.829 [restartedMain] INFO
> >  o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel 3.18.3
> > (camel-1) is starting
> >
> > Still I have the error :
> > CaughtExceptionType: java.util.ConcurrentModificationException
> >  CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
> > access  StackTrace: java.util.ConcurrentModificationException:
> > KafkaConsumer is not safe for multi-threaded access
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
> > at
> >
> >
> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
> > at
> >
> >
> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
> > at
> >
> >
> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
> > at
> >
> >
> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
> > at
> >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
> > at
> >
> >
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
> > at
> >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
> > at
> >
> >
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
> > at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
> > at
> >
> >
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
> > at
> >
> >
> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
> > at
> >
> >
> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
> > at
> >
> >
> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
> >
> > Ivan
> >
> > On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
> angusyo...@gmail.com>
> > wrote:
> >
> > > Hi, thanks
> > >
> > > Please, can you try with Camel 3.18.3? Along with the rest of the
> > > community, we introduced a lot of fixes on the Kafka component on
> 3.18.2
> > > and 3.18.3. Maybe it will help you solve the problem (and, at the same
> > > time, it's an LTS version, so we can fix it if there's a problem).
> > >
> > > Thanks in advance
> > >
> > > On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
> > ivan.rododen...@gmail.com>
> > > wrote:
> > >
> > > > Hi Otavio,
> > > > Camel version is 3.17.0
> > > >
> > > > thank you
> > > >
> > > > On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
> > > angusyo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Ivan,
> > > > >
> > > > > For Kafka (consumer) objects, yes, they should be created for every
> > > > route.
> > > > > I want to try to take a look at it during this week or in the next.
> > > > >
> > > > > In the meantime, can you please tell me which version of Camel you
> > are
> > > > > using?
> > > > >
> > > > > Kind regards
> > > > >
> > > > > On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Oktavio thank you for your response.
> > > > > >
> > > > > > The commit is made synchronously by the kafkaOffsetProcessor
> (whose
> > > > code
> > > > > I
> > > > > > forgot to attach, more details here
> > > > > >
> > > > >
> > > >
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > > > > ).
> > > > > > Basically it does what the documentation says for synchronous
> > > commits.
> > > > > >
> > > > > > Multiple threads cannot access the same kafka client that’s it,
> but
> > > > camel
> > > > > > handles instances and threads, so not so easy to fix for me.
> > > > > >
> > > > > > I think about seda because I know that for seda endpoint object
> are
> > > > > pooled
> > > > > > (if my understanding is right).
> > > > > >
> > > > > > Are you sure that new objects are created for every route created
> > > from
> > > > a
> > > > > > template ?
> > > > > >
> > > > > > Ivan Rododendro
> > > > > >
> > > > > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
> > > angusyo...@gmail.com>
> > > > a
> > > > > > écrit :
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > From the code you provided, it's not very clear to me when and
> > > where
> > > > > you
> > > > > > > are calling the commit. Also it's not very clear to me: which
> > > version
> > > > > of
> > > > > > > Camel you are using and which kind of commit factory you are
> > using
> > > > > (async
> > > > > > > [1] or sync [2]?).
> > > > > > >
> > > > > > > That said ...The problem here is that - as explained in the
> > > exception
> > > > > > > message - the Kafka client cannot be accessed from a different
> > > > thread.
> > > > > > >
> > > > > > > So, I am not entirely sure that the problem is related to seda
> or
> > > > > > something
> > > > > > > like that. Also, Camel will indeed, create a different consumer
> > for
> > > > > every
> > > > > > > route.
> > > > > > >
> > > > > > > Please, can you provide a bit more details about the code you
> > have?
> > > > > > >
> > > > > > > 1.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > > > > 2.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > > > > >
> > > > > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > > > > ivan.rododen...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> Hello
> > > > > > >> I'm really new to Camel concepts, our need is to create some
> > > > identical
> > > > > > >> routes, identical except for some parameters, from a Kafka
> topic
> > > to
> > > > a
> > > > > > http
> > > > > > >> endpoint, with some processing in-between.
> > > > > > >>
> > > > > > >> Besides this we want to explicitly commit the message
> > consumption
> > > > only
> > > > > > when
> > > > > > >> the http endpoint has been successfully called.
> > > > > > >>
> > > > > > >> In order to achieve this we set up a route template that
> carries
> > > the
> > > > > > Route
> > > > > > >> parameterization and set it up to manually commit after having
> > > > called
> > > > > > the
> > > > > > >> http endpoint :
> > > > > > >> public void configure() throws Exception {
> > > > > > >>        // @formatter:off
> > > > > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > > > > >>            .templateParameter(Constantes.JOB_NAME)
> > > > > > >>            .templateParameter(Constantes.TOPIC)
> > > > > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > > > > >>
> > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > > > > >>
> > .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > > > > >>            .from(getKafkaEndpoint())
> > > > > > >>            .messageHistory()
> > > > > > >>            .filter(simple("${header.publisherId} ==
> > > > > '{{publisherId}}'"))
> > > > > > >>            .process(messageLoggerProcessor)
> > > > > > >>            .process(modelMapperProcessor)
> > > > > > >>            .process(jsonlToArrayProcessor)
> > > > > > >>            .process(payloadProcessor)
> > > > > > >>
> > > > > > >>
> > > > >
> > >
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > > > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > > > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > > > > >> constant("application/json;charset=UTF-8"))
> > > > > > >>
> > > > .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > > > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > > > > >>            .process(apiConsumerProcessorLogger)
> > > > > > >>            .to(this.url)
> > > > > > >>            .process(kafkaOffsetProcessor);
> > > > > > >>        // @formatter:on
> > > > > > >>    }
> > > > > > >>
> > > > > > >>    private String getKafkaEndpoint() {
> > > > > > >>        String endpoint =
> > > > > > >>
> > > > > >
> > > >
> > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > > > > +
> > > > > > >> kafkaBrokers;
> > > > > > >>
> > > > > > >>        if (securityEnabled()) {
> > > > > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > > > > >> "&saslMechanism=PLAIN"
> > > > > > >>                    +
> > > > > > >>
> > > > >
> > >
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > > > > >> required username=\""
> > > > > > >>                    + username + "\" password=\"" + password +
> > > "\";"
> > > > +
> > > > > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > > > > >>                    + "&sslTruststorePassword=" +
> > > > > sslTruststorePassword;
> > > > > > >>        }
> > > > > > >>
> > > > > > >>        return endpoint;
> > > > > > >>    }
> > > > > > >>
> > > > > > >> The problem is that we systematically get this error when a
> > > message
> > > > is
> > > > > > >> consumed by a route :
> > > > > > >>
> > > > > > >> Trace: java.util.ConcurrentModificationException:
> KafkaConsumer
> > is
> > > > not
> > > > > > >> safe for multi-threaded access
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > > > > >>    at
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > > > > >>
> > > > > > >>
> > > > > > >> My understanding is that the instance of KafkaConsumer is
> reused
> > > in
> > > > > > >> multiple routes and therefore it generates the error, but it
> > could
> > > > be
> > > > > > also
> > > > > > >> related to using SEDA endpoint as stated here (
> > > > > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we
> > > don't
> > > > > > >> explicitly do.
> > > > > > >>
> > > > > > >> We tried injecting a KafkaComponent local bean in the route :
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > > > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > > > > > >>            .end()
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > > > > >> "#{{myKafkaConfiguration}}")
> > > > > > >>            .end()
> > > > > > >>            .from("#{{myKafka}}")
> > > > > > >>
> > > > > > >> But it ends up with another error because you cannot consume a
> > > Bean
> > > > > > >> endpoint (
> > > > > > https://camel.apache.org/components/3.18.x/bean-component.html)
> > > > > > >>
> > > > > > >> How to use a different KafkaConsumer for every created route ?
> > Or,
> > > > if
> > > > > > the
> > > > > > >> issue is SEDA related, how to make this route a direct route?
> > > > > > >>
> > > > > > >> Thank you for your help
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Otavio R. Piske
> > > > > > > http://orpiske.net
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Otavio R. Piske
> > > > > http://orpiske.net
> > > > >
> > > >
> > >
> > >
> > > --
> > > Otavio R. Piske
> > > http://orpiske.net
> > >
> >
>
>
> --
> Otavio R. Piske
> http://orpiske.net
>

Reply via email to