[ 
https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16957264#comment-16957264
 ] 

amuthan Ganeshan commented on KAFKA-7181:
-----------------------------------------

h3. *I am using the latest version of Kafka stream 2.3.0 but this bug still 
exists, could you please help me fix this, following is the scenario for you to 
review.*

 

I have a Kafka stream application that stores the incoming messages into a 
state store, and later during the punctuation period, we store them into a big 
data persistent store after processing the messages.

The application consumes from 120 partitions distributed across 40 instances. 
The application has been running fine without any problem for months, but all 
of a sudden some of the instances failed because of a stream thread exception 
saying  

```java.lang.IllegalStateException: No current assignment for partition 
<app_name>-<store_name>-changelog-98```

 

And other instances are stuck in the REBALANCING state, and never comes out of 
it. Here is the full stack trace, I just masked the application-specific app 
name and store name in the stack trace due to NDA.

 

```

2019-10-21 13:27:13,481 ERROR 
[application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
[org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
[application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
Encountered the following error during processing:
java.lang.IllegalStateException: No current assignment for partition 
application.id-store_name-changelog-98
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
at 
org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)

```

 

Now I checked the state sore disk usage; it is less than 40% of the total disk 
space available. Restarting the application solves the problem for a short 
amount of time, but the error popping up randomly on some other instances 
quickly. I tried to change the retry and retry.backoff.ms configuration but not 
helpful at all

```

retries = 2147483647

retry.backoff.ms

```

After googling for some time I found there was a similar bug reported to the 
Kafka team in the past, and also notice my stack trace is exactly matching with 
the stack trace of the reported bug.

Here is the link for the bug reported on a comparable basis a year ago.

https://issues.apache.org/jira/browse/KAFKA-7181

 

Now I am wondering is there a workaround for this bug though configuration 
changes, or is there something wrong the way I set up the application, the 
following are the configuration I have for my stream application.

 

```

consumer.session.timeout.ms=30000
metric.reporters=org.apache.kafka.common.metrics.JmxReporter
replication.factor=3
metadata.max.age.ms=30000
max.partition.fetch.bytes=2000000
producer.retries=2147483647
bootstrap.servers= <bootstrap server list goes here>
metrics.recording.level=DEBUG
producer.retry.backoff.ms=60000
consumer.auto.offset.reset=latest
application.server=0.0.0.0:6063
num.standby.replicas=1
max.poll.records=2
group.initial.rebalance.delay.ms=30000
state.dir= <state dir path goes here>
heartbeat.interval.ms=10000
max.poll.interval.ms=300000
num.stream.threads=10
application.id= <application id goes here>

```

Note: The original bug reported a year back got a conclusion that it is related 
to https://issues.apache.org/jira/browse/KAFKA-7657 and reported solved in 
version 2.2.0, but I am using the latest 2.3.0 version.

I appreciate your help concerning this bug.

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters IllegalStateException
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7181
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7181
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Romil Kumar Vasani
>            Priority: Major
>             Fix For: 2.2.0
>
>
> One the StreamThread encounters an IllegalStateException and is marked DEAD, 
> shut down.
> The application doesn't spawn a new thread in it's place, the partitions of 
> that thread are assigned to a different thread and it synchronizes. But the 
> application is stuck in REBALANCING state, as not all StreamThreads are in 
> RUNNING state.
> Excepted: New thread should come up and after synchronization/rebalancing it 
> the KafkaStream.State should be RUNNING
> Since all the active threads (that are not marked DEAD) are in RUNNING state, 
> the KafkaStreams.State should be RUNNING
> P.S. I am reporting an issue for the first time. If there is more information 
> needed I can provide.
> Below are the logs from the IllegalStateException: 
> 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error 
> during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> consumerGroup-stateStore-changelog-10
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
> 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] State transition from RUNNING to 
> PENDING_SHUTDOWN
>  2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] Shutting down
>  2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] 
> o.a.k.clients.producer.KafkaProducer : [Producer 
> clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer 
> with timeoutMillis = 9223372036854775807 ms.
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] Shutdown complete
>  2018-07-18 03:02:27.579 ERROR 1 — [-StreamThread-2] xxx.xxx.xxx.AppRunner : 
> Unhandled exception in thread: 43:consumerGroup-StreamThread-2
> java.lang.IllegalStateException: No current assignment for partition 
> consumerGroup-inventoryStore-changelog-10
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to