Chris Medved created KAFKA-6587:
-----------------------------------

             Summary: Kafka Streams hangs when not able to access internal 
topics
                 Key: KAFKA-6587
                 URL: https://issues.apache.org/jira/browse/KAFKA-6587
             Project: Kafka
          Issue Type: Bug
          Components: security, streams
    Affects Versions: 1.0.0
            Reporter: Chris Medved


*Expectation:* Kafka Streams client will throw an exception, log errors, or 
crash when a fatal error occurs.

*Observation:* Kafka Streams does not log an error or throw an exception when 
necessary permissions for internal state store topics are not granted. It will 
hang indefinitely and not start running the topology.

*Steps to reproduce:*
 # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found 
should be set to false, or deny permissions must be set on the intermediate 
topics).
 # Create a simple streams application that does a stateful operation such as 
count.
 # Grant ACLs on source and sink topics to principal used for testing (would 
recommend using ANONYMOUS user if possible for ease of testing).
 # Grant ACLs for consumer group and cluster create. Add deny permissions to 
state store topics if the default is "allow". You can run the application to 
create the topics or use the toplogy describe method to get the names.
 # Run streams application. It should hang on "(Re-)joining group" with no 
errors printed.

*Detailed Explanation*

I spent some time trying to figure out what was wrong with my streams app. I'm 
using ACLs on my Kafka cluster and it turns out I forgot to grant read/write 
access to the internal topic state store for an aggregation.

The streams client would hang on "(Re-)joining group" until killed (note ^C is 
ctrl+c, which I used to kill the app): 
{code:java}
10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=kafka-consumer-client-StreamThread-1-consumer, 
groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: 
2147483647 rack: null)
10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=kafka-consumer-client-StreamThread-1-consumer, 
groupId=kafka-consumer-test] Revoking previously assigned partitions []
10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[kafka-consumer-client-StreamThread-1] State transition from RUNNING to 
PARTITIONS_REVOKED
10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO 
org.apache.kafka.streams.KafkaStreams - stream-client 
[kafka-consumer-client]State transition from RUNNING to REBALANCING
10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[kafka-consumer-client-StreamThread-1] partition revocation took 1 ms.
suspended active tasks: []
suspended standby tasks: []
10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=kafka-consumer-client-StreamThread-1-consumer, 
groupId=kafka-consumer-test] (Re-)joining group
^C
10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - 
stream-client [kafka-consumer-client]State transition from REBALANCING to 
PENDING_SHUTDOWN
10:34:53.610 [Thread-3] INFO 
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[kafka-consumer-client-StreamThread-1] Informed to shut down
10:34:53.610 [Thread-3] INFO 
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[kafka-consumer-client-StreamThread-1] State transition from PARTITIONS_REVOKED 
to PENDING_SHUTDOWN{code}
The server log would show:
{code:java}
[2018-02-23 10:29:10,408] INFO [Partition 
kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0 
broker=0] 
kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0 starts 
at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 
(kafka.cluster.Partition)
[2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Member 
kafka-consumer-client-StreamThread-1-consumer-f86e4ca8-4c
45-4883-bdaa-2383193eabbe in group kafka-consumer-test has failed, removing it 
from the group (kafka.coordinator.group.GroupCoordinator)
[2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Preparing to rebalance 
group kafka-consumer-test with old generation 1 (__consumer_offsets-23) 
(kafka.coordinator.group.GroupCoordinator)
[2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Group kafka-consumer-test 
with generation 2 is now empty (__consumer_offsets-23) 
(kafka.coordinator.group.GroupCoordinator)
[2018-02-23 10:31:23,448] INFO [GroupMetadataManager brokerId=0] Group 
kafka-consumer-test transitioned to Dead in generation 2 
(kafka.coordinator.group.GroupMetadataManager){code}
In this example, the internal topic was created. If the internal topic already 
exists, it will try to create it again and fail with a "topic already exists" 
exception (shown in the server log, not the client).

The streams client then just remains stuck indefinitely. No errors or warnings 
are printed, and it does not seem to actually shutdown at any point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to