[
https://issues.apache.org/jira/browse/KAFKA-6587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-6587.
----------------------------------
Resolution: Fixed
Assignee: Ted Yu
Fix Version/s: 2.1.0
> Kafka Streams hangs when not able to access internal topics
> -----------------------------------------------------------
>
> Key: KAFKA-6587
> URL: https://issues.apache.org/jira/browse/KAFKA-6587
> Project: Kafka
> Issue Type: Bug
> Components: security, streams
> Affects Versions: 1.0.0
> Reporter: Chris Medved
> Assignee: Ted Yu
> Priority: Minor
> Fix For: 2.1.0
>
>
> *Expectation:* Kafka Streams client will throw an exception, log errors, or
> crash when a fatal error occurs.
> *Observation:* Kafka Streams does not log an error or throw an exception when
> necessary permissions for internal state store topics are not granted. It
> will hang indefinitely and not start running the topology.
> *Steps to reproduce:*
> # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found
> should be set to false, or deny permissions must be set on the intermediate
> topics).
> # Create a simple streams application that does a stateful operation such as
> count.
> # Grant ACLs on source and sink topics to principal used for testing (would
> recommend using ANONYMOUS user if possible for ease of testing).
> # Grant ACLs for consumer group and cluster create. Add deny permissions to
> state store topics if the default is "allow". You can run the application to
> create the topics or use the toplogy describe method to get the names.
> # Run streams application. It should hang on "(Re-)joining group" with no
> errors printed.
> *Detailed Explanation*
> I spent some time trying to figure out what was wrong with my streams app.
> I'm using ACLs on my Kafka cluster and it turns out I forgot to grant
> read/write access to the internal topic state store for an aggregation.
> The streams client would hang on "(Re-)joining group" until killed (note ^C
> is ctrl+c, which I used to kill the app):
> {code:java}
> 10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=kafka-consumer-client-StreamThread-1-consumer,
> groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id:
> 2147483647 rack: null)
> 10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
> clientId=kafka-consumer-client-StreamThread-1-consumer,
> groupId=kafka-consumer-test] Revoking previously assigned partitions []
> 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [kafka-consumer-client-StreamThread-1] State transition from RUNNING to
> PARTITIONS_REVOKED
> 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO
> org.apache.kafka.streams.KafkaStreams - stream-client
> [kafka-consumer-client]State transition from RUNNING to REBALANCING
> 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms.
> suspended active tasks: []
> suspended standby tasks: []
> 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=kafka-consumer-client-StreamThread-1-consumer,
> groupId=kafka-consumer-test] (Re-)joining group
> ^C
> 10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams -
> stream-client [kafka-consumer-client]State transition from REBALANCING to
> PENDING_SHUTDOWN
> 10:34:53.610 [Thread-3] INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [kafka-consumer-client-StreamThread-1] Informed to shut down
> 10:34:53.610 [Thread-3] INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [kafka-consumer-client-StreamThread-1] State transition from
> PARTITIONS_REVOKED to PENDING_SHUTDOWN{code}
> The server log would show:
> {code:java}
> [2018-02-23 10:29:10,408] INFO [Partition
> kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0
> broker=0]
> kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0
> starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1
> (kafka.cluster.Partition)
> [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Member
> kafka-consumer-client-StreamThread-1-consumer-f86e4ca8-4c
> 45-4883-bdaa-2383193eabbe in group kafka-consumer-test has failed, removing
> it from the group (kafka.coordinator.group.GroupCoordinator)
> [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Preparing to rebalance
> group kafka-consumer-test with old generation 1 (__consumer_offsets-23)
> (kafka.coordinator.group.GroupCoordinator)
> [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Group
> kafka-consumer-test with generation 2 is now empty (__consumer_offsets-23)
> (kafka.coordinator.group.GroupCoordinator)
> [2018-02-23 10:31:23,448] INFO [GroupMetadataManager brokerId=0] Group
> kafka-consumer-test transitioned to Dead in generation 2
> (kafka.coordinator.group.GroupMetadataManager){code}
> In this example, the internal topic was created. If the internal topic
> already exists, it will try to create it again and fail with a "topic already
> exists" exception (shown in the server log, not the client).
> The streams client then just remains stuck indefinitely. No errors or
> warnings are printed, and it does not seem to actually shutdown at any point.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)