[ 
https://issues.apache.org/jira/browse/CAMEL-17424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550325#comment-17550325
 ] 

Rafał Gała commented on CAMEL-17424:
------------------------------------

It behaves differently on 3.17.0:

Af first call to 
{code:java}
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); {code}
in KafkaFetchRecords class it fails with TopicAuthorizationException, but the 
default poll exception strategy tells it to just go on so it calls the above 
again but this time it does not fail anymore with TopicAuthorizationException, 
but it also does not fetch any records (the consumer is in some weird state 
after first poll attempt). It keeps trying to poll records like if there was no 
authorization issue at all. Luckily the Camel shutdown process is not affected 
by this.

I would like to implement my own poll exception strategy and just stop polling 
when TopicAuthorizationException occurs, but continue with all other 
exceptions, but as I pointed out in CAMEL-18142, it seems impossible now. I 
could store the exception during the call to
{code:java}
pollExceptionStrategy.handle(partitionLastOffset, e) {code}
and then use it in *canContinue()* method later on, but it is not a thread safe 
solution unless I use a ThreadLocal wrapper etc.

> camel-kafka - Shutdown issues when attempting to consume from topic without 
> authorization
> -----------------------------------------------------------------------------------------
>
>                 Key: CAMEL-17424
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17424
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.14.0
>            Reporter: Rafał Gała
>            Priority: Minor
>
> Hello :)
> Something has been introduced in 3.14.0 that causes long Camel shutdown when 
> there has been an attempt to consume form a topic without necessary 
> authorizations. The more consumers has been started the longer Camel takes to 
> shut down.
> Below is what is logged many times after a shutdown has been initiated (I 
> masked some parts I did not want to share). In this case I consumed from two 
> topics: *some-other-topic-that-consumer-was-authorized-to*  and 
> *e2k-test-bledu.*
> {noformat}
> 2022-01-03 18:27:05.032  WARN 25556 --- [rdReplaceEvent]] 
> o.a.c.component.kafka.KafkaFetchRecords  : Exception 
> org.apache.kafka.common.errors.TopicAuthorizationException caught while 
> polling some-other-topic-that-consumer-was-authorized-to-Thread 0 from kafka 
> topic some-other-topic-that-consumer-was-authorized-to at offset {}: Not 
> authorized to access topics: [e2k-test-bledu]
> 2022-01-03 18:27:05.032  WARN 25556 --- [rdReplaceEvent]] 
> o.a.c.component.kafka.KafkaFetchRecords  : Deferring processing to the 
> exception handler based on polling exception strategy
> 2022-01-03 18:27:05.032  INFO 25556 --- [rdReplaceEvent]] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
> clientId=consumer-event2kafka_test3-12, groupId=event2kafka_test3] 
> (Re-)joining group
> 2022-01-03 18:27:05.051  INFO 25556 --- [rdReplaceEvent]] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
> clientId=consumer-event2kafka_test3-12, groupId=event2kafka_test3] 
> Successfully joined group with generation Generation{generationId=83, 
> memberId='consumer-event2kafka_test3-12-74747411-96e8-4111-bab3-224cf754018f',
>  protocol='range'}
> 2022-01-03 18:27:05.066  INFO 25556 --- [-PZOdmowaTrans]] 
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
> clientId=consumer-event2kafka_test3-13, groupId=event2kafka_test3] Requesting 
> to re-join the group and trigger rebalance since the assignment metadata has 
> changed from (***masked***)
> 2022-01-03 18:27:05.066  INFO 25556 --- [-PZOdmowaTrans]] 
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
> clientId=consumer-event2kafka_test3-13, groupId=event2kafka_test3] Requesting 
> to re-join the group and trigger rebalance since the assignment metadata has 
> changed from (***masked***)
> 2022-01-03 18:27:05.071  WARN 25556 --- [rdReplaceEvent]] 
> org.apache.kafka.clients.NetworkClient   : [Consumer 
> clientId=consumer-event2kafka_test3-12, groupId=event2kafka_test3] Error 
> while fetching metadata with correlation id 133 : 
> {e2k-test-bledu=TOPIC_AUTHORIZATION_FAILED}
> 2022-01-03 18:27:05.071 ERROR 25556 --- [rdReplaceEvent]] 
> org.apache.kafka.clients.Metadata        : [Consumer 
> clientId=consumer-event2kafka_test3-12, groupId=event2kafka_test3] Topic 
> authorization failed for topics [e2k-test-bledu]
> 2022-01-03 18:27:05.071  INFO 25556 --- [rdReplaceEvent]] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
> clientId=consumer-event2kafka_test3-12, groupId=event2kafka_test3] Rebalance 
> failed.
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [e2k-test-bledu] {noformat}
> My use case may be specific because I start a separate route for every topic 
> I want to consume from (I do not want to have a shared consumer for all 
> topics because I need different prameters for selected topics, like offset 
> reset etc.). Currently I consume from 44 topics and the Camel takes like four 
> minutes to shut down. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to