[ https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16957264#comment-16957264 ]
amuthan Ganeshan commented on KAFKA-7181: ----------------------------------------- h3. *I am using the latest version of Kafka stream 2.3.0 but this bug still exists, could you please help me fix this, following is the scenario for you to review.* I have a Kafka stream application that stores the incoming messages into a state store, and later during the punctuation period, we store them into a big data persistent store after processing the messages. The application consumes from 120 partitions distributed across 40 instances. The application has been running fine without any problem for months, but all of a sudden some of the instances failed because of a stream thread exception saying ```java.lang.IllegalStateException: No current assignment for partition <app_name>-<store_name>-changelog-98``` And other instances are stuck in the REBALANCING state, and never comes out of it. Here is the full stack trace, I just masked the application-specific app name and store name in the stack trace due to NDA. ``` 2019-10-21 13:27:13,481 ERROR [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] Encountered the following error during processing: java.lang.IllegalStateException: No current assignment for partition application.id-store_name-changelog-98 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) at org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) ``` Now I checked the state sore disk usage; it is less than 40% of the total disk space available. Restarting the application solves the problem for a short amount of time, but the error popping up randomly on some other instances quickly. I tried to change the retry and retry.backoff.ms configuration but not helpful at all ``` retries = 2147483647 retry.backoff.ms ``` After googling for some time I found there was a similar bug reported to the Kafka team in the past, and also notice my stack trace is exactly matching with the stack trace of the reported bug. Here is the link for the bug reported on a comparable basis a year ago. https://issues.apache.org/jira/browse/KAFKA-7181 Now I am wondering is there a workaround for this bug though configuration changes, or is there something wrong the way I set up the application, the following are the configuration I have for my stream application. ``` consumer.session.timeout.ms=30000 metric.reporters=org.apache.kafka.common.metrics.JmxReporter replication.factor=3 metadata.max.age.ms=30000 max.partition.fetch.bytes=2000000 producer.retries=2147483647 bootstrap.servers= <bootstrap server list goes here> metrics.recording.level=DEBUG producer.retry.backoff.ms=60000 consumer.auto.offset.reset=latest application.server=0.0.0.0:6063 num.standby.replicas=1 max.poll.records=2 group.initial.rebalance.delay.ms=30000 state.dir= <state dir path goes here> heartbeat.interval.ms=10000 max.poll.interval.ms=300000 num.stream.threads=10 application.id= <application id goes here> ``` Note: The original bug reported a year back got a conclusion that it is related to https://issues.apache.org/jira/browse/KAFKA-7657 and reported solved in version 2.2.0, but I am using the latest 2.3.0 version. I appreciate your help concerning this bug. > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters IllegalStateException > ------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7181 > URL: https://issues.apache.org/jira/browse/KAFKA-7181 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Romil Kumar Vasani > Priority: Major > Fix For: 2.2.0 > > > One the StreamThread encounters an IllegalStateException and is marked DEAD, > shut down. > The application doesn't spawn a new thread in it's place, the partitions of > that thread are assigned to a different thread and it synchronizes. But the > application is stuck in REBALANCING state, as not all StreamThreads are in > RUNNING state. > Excepted: New thread should come up and after synchronization/rebalancing it > the KafkaStream.State should be RUNNING > Since all the active threads (that are not marked DEAD) are in RUNNING state, > the KafkaStreams.State should be RUNNING > P.S. I am reporting an issue for the first time. If there is more information > needed I can provide. > Below are the logs from the IllegalStateException: > 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error > during processing: > java.lang.IllegalStateException: No current assignment for partition > consumerGroup-stateStore-changelog-10 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from RUNNING to > PENDING_SHUTDOWN > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] Shutting down > 2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] > o.a.k.clients.producer.KafkaProducer : [Producer > clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms. > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] Shutdown complete > 2018-07-18 03:02:27.579 ERROR 1 — [-StreamThread-2] xxx.xxx.xxx.AppRunner : > Unhandled exception in thread: 43:consumerGroup-StreamThread-2 > java.lang.IllegalStateException: No current assignment for partition > consumerGroup-inventoryStore-changelog-10 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) -- This message was sent by Atlassian Jira (v8.3.4#803005)