Hi Otavio,
Camel version is 3.17.0

thank you

On Wed, Nov 2, 2022 at 7:00 PM Otavio Rodolfo Piske <angusyo...@gmail.com>
wrote:

> Hi Ivan,
>
> For Kafka (consumer) objects, yes, they should be created for every route.
> I want to try to take a look at it during this week or in the next.
>
> In the meantime, can you please tell me which version of Camel you are
> using?
>
> Kind regards
>
> On Wed, Oct 26, 2022 at 6:50 PM Ivan <ivan.rododen...@gmail.com> wrote:
>
> > Hi Oktavio thank you for your response.
> >
> > The commit is made synchronously by the kafkaOffsetProcessor (whose code
> I
> > forgot to attach, more details here
> >
> https://stackoverflow.com/questions/74096096/camel-and-kafka-manual-commit-java-util-concurrentmodificationexception
> ).
> > Basically it does what the documentation says for synchronous commits.
> >
> > Multiple threads cannot access the same kafka client that’s it, but camel
> > handles instances and threads, so not so easy to fix for me.
> >
> > I think about seda because I know that for seda endpoint object are
> pooled
> > (if my understanding is right).
> >
> > Are you sure that new objects are created for every route created from a
> > template ?
> >
> > Ivan Rododendro
> >
> > > Le 24 oct. 2022 à 15:40, Otavio Rodolfo Piske <angusyo...@gmail.com> a
> > écrit :
> > >
> > > Hi,
> > >
> > > From the code you provided, it's not very clear to me when and where
> you
> > > are calling the commit. Also it's not very clear to me: which version
> of
> > > Camel you are using and which kind of commit factory you are using
> (async
> > > [1] or sync [2]?).
> > >
> > > That said ...The problem here is that - as explained in the exception
> > > message - the Kafka client cannot be accessed from a different thread.
> > >
> > > So, I am not entirely sure that the problem is related to seda or
> > something
> > > like that. Also, Camel will indeed, create a different consumer for
> every
> > > route.
> > >
> > > Please, can you provide a bit more details about the code you have?
> > >
> > > 1.
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
> > > 2.
> > >
> >
> https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53
> > >
> > >> On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <
> > ivan.rododen...@gmail.com>
> > >> wrote:
> > >>
> > >> Hello
> > >> I'm really new to Camel concepts, our need is to create some identical
> > >> routes, identical except for some parameters, from a Kafka topic to a
> > http
> > >> endpoint, with some processing in-between.
> > >>
> > >> Besides this we want to explicitly commit the message consumption only
> > when
> > >> the http endpoint has been successfully called.
> > >>
> > >> In order to achieve this we set up a route template that carries the
> > Route
> > >> parameterization and set it up to manually commit after having called
> > the
> > >> http endpoint :
> > >> public void configure() throws Exception {
> > >>        // @formatter:off
> > >>        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
> > >>            .templateParameter(Constantes.JOB_NAME)
> > >>            .templateParameter(Constantes.TOPIC)
> > >>            .templateParameter(Constantes.PUBLISHER_ID)
> > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > >>            .templateParameter(Constantes.JOB_NAME_PARAMETER)
> > >>            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
> > >>            .from(getKafkaEndpoint())
> > >>            .messageHistory()
> > >>            .filter(simple("${header.publisherId} ==
> '{{publisherId}}'"))
> > >>            .process(messageLoggerProcessor)
> > >>            .process(modelMapperProcessor)
> > >>            .process(jsonlToArrayProcessor)
> > >>            .process(payloadProcessor)
> > >>
> > >>
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
> > >>            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
> > >>            .setHeader(Exchange.CONTENT_TYPE,
> > >> constant("application/json;charset=UTF-8"))
> > >>            .setHeader(Constantes.ACCEPT,constant("application/json"))
> > >>            .setHeader(Constantes.API_KEY, constant(apiKey))
> > >>
> > >>
> > >>
> >
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
> > >>            .process(apiConsumerProcessorLogger)
> > >>            .to(this.url)
> > >>            .process(kafkaOffsetProcessor);
> > >>        // @formatter:on
> > >>    }
> > >>
> > >>    private String getKafkaEndpoint() {
> > >>        String endpoint =
> > >>
> > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers="
> +
> > >> kafkaBrokers;
> > >>
> > >>        if (securityEnabled()) {
> > >>            endpoint += "&securityProtocol=SASL_SSL" +
> > >> "&saslMechanism=PLAIN"
> > >>                    +
> > >>
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> > >> required username=\""
> > >>                    + username + "\" password=\"" + password + "\";" +
> > >> "&sslTruststoreLocation=" + sslTrustStoreLocation
> > >>                    + "&sslTruststorePassword=" +
> sslTruststorePassword;
> > >>        }
> > >>
> > >>        return endpoint;
> > >>    }
> > >>
> > >> The problem is that we systematically get this error when a message is
> > >> consumed by a route :
> > >>
> > >> Trace: java.util.ConcurrentModificationException: KafkaConsumer is not
> > >> safe for multi-threaded access
> > >>    at
> > >>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
> > >>    at
> > >>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
> > >>    at
> > >>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
> > >>    at
> > >>
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
> > >>    at
> > >>
> >
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> > >>
> > >>
> > >> My understanding is that the instance of KafkaConsumer is reused in
> > >> multiple routes and therefore it generates the error, but it could be
> > also
> > >> related to using SEDA endpoint as stated here (
> > >> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
> > >> explicitly do.
> > >>
> > >> We tried injecting a KafkaComponent local bean in the route :
> > >>
> > >>
> > >>
> >
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> > >> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
> > >>            .end()
> > >>
> > >>
> >
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> > >> "#{{myKafkaConfiguration}}")
> > >>            .end()
> > >>            .from("#{{myKafka}}")
> > >>
> > >> But it ends up with another error because you cannot consume a Bean
> > >> endpoint (
> > https://camel.apache.org/components/3.18.x/bean-component.html)
> > >>
> > >> How to use a different KafkaConsumer for every created route ? Or, if
> > the
> > >> issue is SEDA related, how to make this route a direct route?
> > >>
> > >> Thank you for your help
> > >>
> > >
> > >
> > > --
> > > Otavio R. Piske
> > > http://orpiske.net
> >
>
>
> --
> Otavio R. Piske
> http://orpiske.net
>

Reply via email to