Hi Oktavio thank you for your response. 

The commit is made synchronously by the kafkaOffsetProcessor (whose code I 
forgot to attach, more details here 
https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception).
 Basically it does what the documentation says for synchronous commits. 

Multiple threads cannot access the same kafka client that’s it, but camel 
handles instances and threads, so not so easy to fix for me. 

I think about seda because I know that for seda endpoint object are pooled (if 
my understanding is right). 

Are you sure that new objects are created for every route created from a 
template ?

Ivan Rododendro

> Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <angusyo...@gmail.com> a écrit :
> 
> Hi,
> 
> From the code you provided, it's not very clear to me when and where you
> are calling the commit. Also it's not very clear to me: which version of
> Camel you are using and which kind of commit factory you are using (async
> [1] or sync [2]?).
> 
> That said ...The problem here is that - as explained in the exception
> message - the Kafka client cannot be accessed from a different thread.
> 
> So, I am not entirely sure that the problem is related to seda or something
> like that. Also, Camel will indeed, create a different consumer for every
> route.
> 
> Please, can you provide a bit more details about the code you have?
> 
> 1.
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> 2.
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> 
>> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <ivan.rododen...@gmail.com>
>> wrote:
>> 
>> Hello
>> I'm really new to Camel concepts, our need is to create some identical
>> routes, identical except for some parameters, from a Kafka topic to a http
>> endpoint, with some processing in-between.
>> 
>> Besides this we want to explicitly commit the message consumption only when
>> the http endpoint has been successfully called.
>> 
>> In order to achieve this we set up a route template that carries the Route
>> parameterization and set it up to manually commit after having called the
>> http endpoint :
>> public void configure() throws Exception {
>>        // @formatter:off
>>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
>>            .templateParameter(Constantes.JOB_NAME)
>>            .templateParameter(Constantes.TOPIC)
>>            .templateParameter(Constantes.PUBLISHER_ID)
>>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
>>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>>            .from(getKafkaEndpoint())
>>            .messageHistory()
>>            .filter(simple("${header.publisherId} == '{{publisherId}}'"))
>>            .process(messageLoggerProcessor)
>>            .process(modelMapperProcessor)
>>            .process(jsonlToArrayProcessor)
>>            .process(payloadProcessor)
>> 
>> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
>>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
>>            .setHeader(Exchange.CONTENT_TYPE,
>> constant("application/json;charset=UTF-8"))
>>            .setHeader(Constantes.ACCEPT,constant("application/json"))
>>            .setHeader(Constantes.API_KEY, constant(apiKey))
>> 
>> 
>> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
>>            .process(apiConsumerProcessorLogger)
>>            .to(this.url)
>>            .process(kafkaOffsetProcessor);
>>        // @formatter:on
>>    }
>> 
>>    private String getKafkaEndpoint() {
>>        String endpoint =
>> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" +
>> kafkaBrokers;
>> 
>>        if (securityEnabled()) {
>>            endpoint += "&securityProtocol=SASL_SSL" +
>> "&saslMechanism=PLAIN"
>>                    +
>> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
>> required username=\""
>>                    + username + "\" password=\"" + password + "\";" +
>> "&sslTruststoreLocation=" + sslTrustStoreLocation
>>                    + "&sslTruststorePassword=" + sslTruststorePassword;
>>        }
>> 
>>        return endpoint;
>>    }
>> 
>> The problem is that we systematically get this error when a message is
>> consumed by a route :
>> 
>> Trace: java.util.ConcurrentModificationException: KafkaConsumer is not
>> safe for multi-threaded access
>>    at
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>>    at
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>>    at
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
>>    at
>> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
>>    at
>> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
>> 
>> 
>> My understanding is that the instance of KafkaConsumer is reused in
>> multiple routes and therefore it generates the error, but it could be also
>> related to using SEDA endpoint as stated here (
>> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
>> explicitly do.
>> 
>> We tried injecting a KafkaComponent local bean in the route :
>> 
>> 
>> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
>> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
>>            .end()
>> 
>> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
>> "#{{myKafkaConfiguration}}")
>>            .end()
>>            .from("#{{myKafka}}")
>> 
>> But it ends up with another error because you cannot consume a Bean
>> endpoint (https://camel.apache.org/components/3.18.x/bean-component.html)
>> 
>> How to use a different KafkaConsumer for every created route ? Or, if the
>> issue is SEDA related, how to make this route a direct route?
>> 
>> Thank you for your help
>> 
> 
> 
> -- 
> Otavio R. Piske
> http://orpiske.net

Reply via email to