[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090111#comment-16090111
 ] 

Guozhang Wang commented on KAFKA-5545:
--------------------------------------

Hello [~yogeshbelur] sorry for the late reply.

I think I have found the root cause of the issue, which is indeed a deadlock 
that cause the stream thread to be not able to exit its current loop call and 
hence not be able to shutdown. More details are the following:

1. When {{stream.close()}} is called, it does nothing but just set a flag in 
the stream threads and let the shutdown thread to wait until the stream threads 
have seen this flag and end the while loop and shutdown themselves.
2. There is an edge case, that the stream thread is still in the 
{{onPartitionAssigned}} callback of the current loop, and is trying to grab the 
state directory locks in order to create the assigned tasks; however the state 
directory locks are not released by other threads who have seen a disconnect / 
request timeout from the brokers since the broker IP has changed within the 
same JVM, they will retry indefinitely as the rebalance timeout is default to 
{{Integer.MAX_VALUE}}. So the thread will never complete its current iteration 
and will not see the shutdown flag and will not shutdown itself.

I have started proposing a fix for the state directory deadlock now, but this 
will probably only in 0.11.0.1 release and backported if we are having an 
0.10.2.2 release. As for the current version (0.10.2.1) version you are on. One 
walkaround is to set the {{ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG}} to a 
smaller value instead of the default {{Integer.MAX_VALUE}} so that it will not 
cause a dead lock, and making sure that for whatever this config value is set 
to, the corresponding {{REQUEST_TIMEOUT_MS_CONFIG}} is set to be larger than it.


> Kafka Stream not able to successfully restart over new broker ip
> ----------------------------------------------------------------
>
>                 Key: KAFKA-5545
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5545
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Yogesh BG
>            Priority: Critical
>         Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.035 [StreamThread-41] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_1. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.037 [StreamThread-37] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_3. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_3] Failed to lock the 
> state directory for task 0_3
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-34] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_7. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_7] Failed to lock the 
> state directory for task 0_7
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-43] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_6. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_6] Failed to lock the 
> state directory for task 0_6
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-45] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_0. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the 
> state directory for task 0_0
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-36] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_4. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock the 
> state directory for task 0_4
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-48] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_2. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_2] Failed to lock the 
> state directory for task 0_2
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:13.642 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-44] Committing all tasks because the commit 
> interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-47] Committing all tasks because the commit 
> interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-42] Committing all tasks because the commit 
> interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-46] Committing all tasks because the commit 
> interval 10000ms has ela
> ]
> psed
> 11:04:13.646 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-33] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:13.648 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-40] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:13.655 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-39] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:13.660 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-35] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-42] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-46] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-47] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-44] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.671 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-33] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.676 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-40] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.677 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-39] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.682 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-35] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:29.025 [pool-4-thread-1]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to