Ok thanks. 

Ivan Rododendro

> Le 23 nov. 2022 à 18:01, Claus Ibsen <claus.ib...@gmail.com> a écrit :
> 
> Hi
> 
> The resequencer EIP is async, so its output is processed by another thread,
> which manual commit from kafka is not supported as you must do that on the
> same thread that received the message.
> 
>> On Wed, Nov 23, 2022 at 3:34 PM Ivan Rododendro <ivan.rododen...@gmail.com>
>> wrote:
>> 
>> Fair enough.
>> 
>> I pushed a reproducer here based on testcontainer ...  *but it don't
>> reproduces the issue *:
>> https://github.com/ivanrododendro/reprex-camelmanualcommit.git
>> 
>> It turns out that my issue is caused by a header based resequencer in
>> the route, basically having this in the route :
>> .resequence(header(Constants.DML_TIMESTAMP_HEADER)).batch().timeout(100)
>> produces this :
>> CaughtExceptionType: java.util.ConcurrentModificationException
>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
>> 
>> The resequencer is in the reproducer too but it does interfere with kafka
>> manual commit.
>> 
>> Nevertheless I'd like to understand  the following logs from the reproducer
>> :
>> INFO  route-1.log - Message received
>> INFO  route-2.log - Message received
>> INFO  o.apache.camel.builder.RouteBuilder.process - Processing message from
>> route [route-1]
>> INFO  o.i.r.c.TemplateBuilder.process - Committed Kafka offset from route
>> [route-2]
>> 
>> Two routes from the same template, with a filter based on message headers.
>> Route 1 processes the message, route 2 discards it but it triggers
>> .onCompletion().onCompleteOnly() (which triggers Kafka manual commit) ..
>> it looks weird to me.
>> 
>> Regards
>> 
>> 
>> 
>> On Tue, Nov 22, 2022 at 2:26 PM Otavio Rodolfo Piske <angusyo...@gmail.com
>>> 
>> wrote:
>> 
>>> Hi,
>>> 
>>> Thanks. I have to be honest with you: I truly want to look more closely
>> at
>>> this one, but it's been a bit hard to try to make sense of the code you
>>> provided so far. It's not something I can quickly modify one of our unit
>>> tests and run.
>>> 
>>> In this case ...
>>> 
>>> Please, can you provide a full reproducer and send the code? Please put
>> it
>>> on Github, so I can clone and reproduce and debug? That would make
>>> investigating and fixing this much easier and quicker for me.
>>> 
>>> Kind regards
>>> 
>>> On Tue, Nov 22, 2022 at 12:37 PM Ivan Rododendro <
>>> ivan.rododen...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Octavio
>>>> I've been  busy ...
>>>> 
>>>> I upgraded to Came 3.18.3 :
>>>> 22-11-2022 12:35:10.829 [restartedMain] INFO
>>>> o.a.c.i.engine.AbstractCamelContext.doStartContext - Apache Camel
>> 3.18.3
>>>> (camel-1) is starting
>>>> 
>>>> Still I have the error :
>>>> CaughtExceptionType: java.util.ConcurrentModificationException
>>>> CaughtExceptionMessage: KafkaConsumer is not safe for multi-threaded
>>>> access  StackTrace: java.util.ConcurrentModificationException:
>>>> KafkaConsumer is not safe for multi-threaded access
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2450)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2434)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1491)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.consumer.AbstractCommitManager.forceCommit(AbstractCommitManager.java:89)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.consumer.DefaultKafkaManualSyncCommit.commit(DefaultKafkaManualSyncCommit.java:31)
>>>> at
>>>> 
>>>> 
>>> 
>> fr.acoss.mdm.consommateurkafka.KafkaOffsetProcessor.process(KafkaOffsetProcessor.java:18)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:818)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:726)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
>>>> at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.Resequencer.processExchange(Resequencer.java:320)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.Resequencer$BatchSender.sendExchanges(Resequencer.java:560)
>>>> at
>>>> 
>>>> 
>>> 
>> org.apache.camel.processor.Resequencer$BatchSender.run(Resequencer.java:483)
>>>> 
>>>> Ivan
>>>> 
>>>> On Tue, Nov 8, 2022 at 3:32 PM Otavio Rodolfo Piske <
>>> angusyo...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi, thanks
>>>>> 
>>>>> Please, can you try with Camel 3.18.3? Along with the rest of the
>>>>> community, we introduced a lot of fixes on the Kafka component on
>>> 3.18.2
>>>>> and 3.18.3. Maybe it will help you solve the problem (and, at the
>> same
>>>>> time, it's an LTS version, so we can fix it if there's a problem).
>>>>> 
>>>>> Thanks in advance
>>>>> 
>>>>> On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <
>>>> ivan.rododen...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Otavio,
>>>>>> Camel version is 3.17.0
>>>>>> 
>>>>>> thank you
>>>>>> 
>>>>>> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <
>>>>> angusyo...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Ivan,
>>>>>>> 
>>>>>>> For Kafka (consumer) objects, yes, they should be created for
>> every
>>>>>> route.
>>>>>>> I want to try to take a look at it during this week or in the
>> next.
>>>>>>> 
>>>>>>> In the meantime, can you please tell me which version of Camel
>> you
>>>> are
>>>>>>> using?
>>>>>>> 
>>>>>>> Kind regards
>>>>>>> 
>>>>>>> On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Oktavio thank you for your response.
>>>>>>>> 
>>>>>>>> The commit is made synchronously by the kafkaOffsetProcessor
>>> (whose
>>>>>> code
>>>>>>> I
>>>>>>>> forgot to attach, more details here
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
>>>>>>> ).
>>>>>>>> Basically it does what the documentation says for synchronous
>>>>> commits.
>>>>>>>> 
>>>>>>>> Multiple threads cannot access the same kafka client that’s it,
>>> but
>>>>>> camel
>>>>>>>> handles instances and threads, so not so easy to fix for me.
>>>>>>>> 
>>>>>>>> I think about seda because I know that for seda endpoint object
>>> are
>>>>>>> pooled
>>>>>>>> (if my understanding is right).
>>>>>>>> 
>>>>>>>> Are you sure that new objects are created for every route
>> created
>>>>> from
>>>>>> a
>>>>>>>> template ?
>>>>>>>> 
>>>>>>>> Ivan Rododendro
>>>>>>>> 
>>>>>>>>> Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <
>>>>> angusyo...@gmail.com>
>>>>>> a
>>>>>>>> écrit :
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> From the code you provided, it's not very clear to me when
>> and
>>>>> where
>>>>>>> you
>>>>>>>>> are calling the commit. Also it's not very clear to me: which
>>>>> version
>>>>>>> of
>>>>>>>>> Camel you are using and which kind of commit factory you are
>>>> using
>>>>>>> (async
>>>>>>>>> [1] or sync [2]?).
>>>>>>>>> 
>>>>>>>>> That said ...The problem here is that - as explained in the
>>>>> exception
>>>>>>>>> message - the Kafka client cannot be accessed from a
>> different
>>>>>> thread.
>>>>>>>>> 
>>>>>>>>> So, I am not entirely sure that the problem is related to
>> seda
>>> or
>>>>>>>> something
>>>>>>>>> like that. Also, Camel will indeed, create a different
>> consumer
>>>> for
>>>>>>> every
>>>>>>>>> route.
>>>>>>>>> 
>>>>>>>>> Please, can you provide a bit more details about the code you
>>>> have?
>>>>>>>>> 
>>>>>>>>> 1.
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
>>>>>>>>> 2.
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
>>>>>>>>> 
>>>>>>>>>> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
>>>>>>>> ivan.rododen...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hello
>>>>>>>>>> I'm really new to Camel concepts, our need is to create some
>>>>>> identical
>>>>>>>>>> routes, identical except for some parameters, from a Kafka
>>> topic
>>>>> to
>>>>>> a
>>>>>>>> http
>>>>>>>>>> endpoint, with some processing in-between.
>>>>>>>>>> 
>>>>>>>>>> Besides this we want to explicitly commit the message
>>>> consumption
>>>>>> only
>>>>>>>> when
>>>>>>>>>> the http endpoint has been successfully called.
>>>>>>>>>> 
>>>>>>>>>> In order to achieve this we set up a route template that
>>> carries
>>>>> the
>>>>>>>> Route
>>>>>>>>>> parameterization and set it up to manually commit after
>> having
>>>>>> called
>>>>>>>> the
>>>>>>>>>> http endpoint :
>>>>>>>>>> public void configure() throws Exception {
>>>>>>>>>>       // @formatter:off
>>>>>>>>>>       routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
>>>>>>>>>>           .templateParameter(Constantes.JOB_NAME)
>>>>>>>>>>           .templateParameter(Constantes.TOPIC)
>>>>>>>>>>           .templateParameter(Constantes.PUBLISHER_ID)
>>>>>>>>>> 
>>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>>>>>>>>>>           .templateParameter(Constantes.JOB_NAME_PARAMETER)
>>>>>>>>>> 
>>>> .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>>>>>>>>>>           .from(getKafkaEndpoint())
>>>>>>>>>>           .messageHistory()
>>>>>>>>>>           .filter(simple("${header.publisherId} ==
>>>>>>> '{{publisherId}}'"))
>>>>>>>>>>           .process(messageLoggerProcessor)
>>>>>>>>>>           .process(modelMapperProcessor)
>>>>>>>>>>           .process(jsonlToArrayProcessor)
>>>>>>>>>>           .process(payloadProcessor)
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>> 
>>> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
>>>>>>>>>>           .setHeader(Exchange.HTTP_METHOD, simple("POST"))
>>>>>>>>>>           .setHeader(Exchange.CONTENT_TYPE,
>>>>>>>>>> constant("application/json;charset=UTF-8"))
>>>>>>>>>> 
>>>>>> .setHeader(Constantes.ACCEPT,constant("application/json"))
>>>>>>>>>>           .setHeader(Constantes.API_KEY, constant(apiKey))
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
>>>>>>>>>>           .process(apiConsumerProcessorLogger)
>>>>>>>>>>           .to(this.url)
>>>>>>>>>>           .process(kafkaOffsetProcessor);
>>>>>>>>>>       // @formatter:on
>>>>>>>>>>   }
>>>>>>>>>> 
>>>>>>>>>>   private String getKafkaEndpoint() {
>>>>>>>>>>       String endpoint =
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
>>>>>>> +
>>>>>>>>>> kafkaBrokers;
>>>>>>>>>> 
>>>>>>>>>>       if (securityEnabled()) {
>>>>>>>>>>           endpoint += "&securityProtocol=SASL_SSL" +
>>>>>>>>>> "&saslMechanism=PLAIN"
>>>>>>>>>>                   +
>>>>>>>>>> 
>>>>>>> 
>>>>> 
>>> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
>>>>>>>>>> required username=\""
>>>>>>>>>>                   + username + "\" password=\"" + password
>> +
>>>>> "\";"
>>>>>> +
>>>>>>>>>> "&sslTruststoreLocation=" + sslTrustStoreLocation
>>>>>>>>>>                   + "&sslTruststorePassword=" +
>>>>>>> sslTruststorePassword;
>>>>>>>>>>       }
>>>>>>>>>> 
>>>>>>>>>>       return endpoint;
>>>>>>>>>>   }
>>>>>>>>>> 
>>>>>>>>>> The problem is that we systematically get this error when a
>>>>> message
>>>>>> is
>>>>>>>>>> consumed by a route :
>>>>>>>>>> 
>>>>>>>>>> Trace: java.util.ConcurrentModificationException:
>>> KafkaConsumer
>>>> is
>>>>>> not
>>>>>>>>>> safe for multi-threaded access
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
>>>>>>>>>>   at
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> My understanding is that the instance of KafkaConsumer is
>>> reused
>>>>> in
>>>>>>>>>> multiple routes and therefore it generates the error, but it
>>>> could
>>>>>> be
>>>>>>>> also
>>>>>>>>>> related to using SEDA endpoint as stated here (
>>>>>>>>>> https://issues.apache.org/jira/browse/CAMEL-12722), which
>> we
>>>>> don't
>>>>>>>>>> explicitly do.
>>>>>>>>>> 
>>>>>>>>>> We tried injecting a KafkaComponent local bean in the route
>> :
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
>>>>>>>>>> "{{" + Constantes.TOPIC
>> +"}}").properties(kafkaConfiguration)
>>>>>>>>>>           .end()
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
>>>>>>>>>> "#{{myKafkaConfiguration}}")
>>>>>>>>>>           .end()
>>>>>>>>>>           .from("#{{myKafka}}")
>>>>>>>>>> 
>>>>>>>>>> But it ends up with another error because you cannot
>> consume a
>>>>> Bean
>>>>>>>>>> endpoint (
>>>>>>>> https://camel.apache.org/components/3.18.x/bean-component.html
>> )
>>>>>>>>>> 
>>>>>>>>>> How to use a different KafkaConsumer for every created
>> route ?
>>>> Or,
>>>>>> if
>>>>>>>> the
>>>>>>>>>> issue is SEDA related, how to make this route a direct
>> route?
>>>>>>>>>> 
>>>>>>>>>> Thank you for your help
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Otavio R. Piske
>>>>>>>>> http://orpiske.net
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Otavio R. Piske
>>>>>>> http://orpiske.net
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Otavio R. Piske
>>>>> http://orpiske.net
>>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> Otavio R. Piske
>>> http://orpiske.net
>>> 
>> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to