Hi, thanks

Please, can you try with Camel 3.18.3? Along with the rest of the
community, we introduced a lot of fixes on the Kafka component on 3.18.2
and 3.18.3. Maybe it will help you solve the problem (and, at the same
time, it's an LTS version, so we can fix it if there's a problem).

Thanks in advance

On Mon, Nov 7, 2022 at 5:35 PM Ivan Rododendro <ivan.rododen...@gmail.com>
wrote:

> Hi Otavio,
> Camel version is 3.17.0
>
> thank you
>
> On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <angusyo...@gmail.com>
> wrote:
>
> > Hi Ivan,
> >
> > For Kafka (consumer) objects, yes, they should be created for every
> route.
> > I want to try to take a look at it during this week or in the next.
> >
> > In the meantime, can you please tell me which version of Camel you are
> > using?
> >
> > Kind regards
> >
> > On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com> wrote:
> >
> > > Hi Oktavio thank you for your response.
> > >
> > > The commit is made synchronously by the kafkaOffsetProcessor (whose
> code
> > I
> > > forgot to attach, more details here
> > >
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> > ).
> > > Basically it does what the documentation says for synchronous commits.
> > >
> > > Multiple threads cannot access the same kafka client that’s it, but
> camel
> > > handles instances and threads, so not so easy to fix for me.
> > >
> > > I think about seda because I know that for seda endpoint object are
> > pooled
> > > (if my understanding is right).
> > >
> > > Are you sure that new objects are created for every route created from
> a
> > > template ?
> > >
> > > Ivan Rododendro
> > >
> > > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <angusyo...@gmail.com>
> a
> > > écrit :
> > > >
> > > > Hi,
> > > >
> > > > From the code you provided, it's not very clear to me when and where
> > you
> > > > are calling the commit. Also it's not very clear to me: which version
> > of
> > > > Camel you are using and which kind of commit factory you are using
> > (async
> > > > [1] or sync [2]?).
> > > >
> > > > That said ...The problem here is that - as explained in the exception
> > > > message - the Kafka client cannot be accessed from a different
> thread.
> > > >
> > > > So, I am not entirely sure that the problem is related to seda or
> > > something
> > > > like that. Also, Camel will indeed, create a different consumer for
> > every
> > > > route.
> > > >
> > > > Please, can you provide a bit more details about the code you have?
> > > >
> > > > 1.
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > > 2.
> > > >
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > > >
> > > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > > ivan.rododen...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> Hello
> > > >> I'm really new to Camel concepts, our need is to create some
> identical
> > > >> routes, identical except for some parameters, from a Kafka topic to
> a
> > > http
> > > >> endpoint, with some processing in-between.
> > > >>
> > > >> Besides this we want to explicitly commit the message consumption
> only
> > > when
> > > >> the http endpoint has been successfully called.
> > > >>
> > > >> In order to achieve this we set up a route template that carries the
> > > Route
> > > >> parameterization and set it up to manually commit after having
> called
> > > the
> > > >> http endpoint :
> > > >> public void configure() throws Exception {
> > > >>        // @formatter:off
> > > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > > >>            .templateParameter(Constantes.JOB_NAME)
> > > >>            .templateParameter(Constantes.TOPIC)
> > > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > > >>            .from(getKafkaEndpoint())
> > > >>            .messageHistory()
> > > >>            .filter(simple("${header.publisherId} ==
> > '{{publisherId}}'"))
> > > >>            .process(messageLoggerProcessor)
> > > >>            .process(modelMapperProcessor)
> > > >>            .process(jsonlToArrayProcessor)
> > > >>            .process(payloadProcessor)
> > > >>
> > > >>
> > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > > >>            .setHeader(Exchange.CONTENT_TYPE,
> > > >> constant("application/json;charset=UTF-8"))
> > > >>
> .setHeader(Constantes.ACCEPT,constant("application/json"))
> > > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > > >>
> > > >>
> > > >>
> > >
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > > >>            .process(apiConsumerProcessorLogger)
> > > >>            .to(this.url)
> > > >>            .process(kafkaOffsetProcessor);
> > > >>        // @formatter:on
> > > >>    }
> > > >>
> > > >>    private String getKafkaEndpoint() {
> > > >>        String endpoint =
> > > >>
> > >
> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> > +
> > > >> kafkaBrokers;
> > > >>
> > > >>        if (securityEnabled()) {
> > > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > > >> "&saslMechanism=PLAIN"
> > > >>                    +
> > > >>
> > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > > >> required username=\""
> > > >>                    + username + "\" password=\"" + password + "\";"
> +
> > > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > > >>                    + "&sslTruststorePassword=" +
> > sslTruststorePassword;
> > > >>        }
> > > >>
> > > >>        return endpoint;
> > > >>    }
> > > >>
> > > >> The problem is that we systematically get this error when a message
> is
> > > >> consumed by a route :
> > > >>
> > > >> Trace: java.util.ConcurrentModificationException: KafkaConsumer is
> not
> > > >> safe for multi-threaded access
> > > >>    at
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > > >>    at
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > > >>    at
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > > >>    at
> > > >>
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > > >>    at
> > > >>
> > >
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > > >>
> > > >>
> > > >> My understanding is that the instance of KafkaConsumer is reused in
> > > >> multiple routes and therefore it generates the error, but it could
> be
> > > also
> > > >> related to using SEDA endpoint as stated here (
> > > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
> > > >> explicitly do.
> > > >>
> > > >> We tried injecting a KafkaComponent local bean in the route :
> > > >>
> > > >>
> > > >>
> > >
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > > >>            .end()
> > > >>
> > > >>
> > >
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > > >> "#{{myKafkaConfiguration}}")
> > > >>            .end()
> > > >>            .from("#{{myKafka}}")
> > > >>
> > > >> But it ends up with another error because you cannot consume a Bean
> > > >> endpoint (
> > > https://camel.apache.org/components/3.18.x/bean-component.html)
> > > >>
> > > >> How to use a different KafkaConsumer for every created route ? Or,
> if
> > > the
> > > >> issue is SEDA related, how to make this route a direct route?
> > > >>
> > > >> Thank you for your help
> > > >>
> > > >
> > > >
> > > > --
> > > > Otavio R. Piske
> > > > http://orpiske.net
> > >
> >
> >
> > --
> > Otavio R. Piske
> > http://orpiske.net
> >
>


-- 
Otavio R. Piske
http://orpiske.net

Reply via email to