[ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066045#comment-16066045
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-5167 at 6/28/17 7:29 AM:
----------------------------------------------------------------------------

Hi,

I think I found much easier way to reproduce the same behaviour.
I am doing more less suche setup in the code:

{{// loop over the inTopicName { }}
{{KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, 
STRING_SERDE, inTopicName );}}
{{stringInput.filter( streamFilter::passOrFilterMessages ) }}
{{                    .map( ndmNormalizer )}}
{{                    .to( outTopicName ); }}
{{ // } }}
{{}}
{{streams = new KafkaStreams( kBuilder, streamsConfig ); }}
{{streams.cleanUp(); }}
{{streams.start(); }}

And if there are *_num.stream.threads=4_* but there are 20 or more inTopicNames 
(many topics to read), then complete application startup is totally 
self-blocked, by writing endless:

2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-3] 
Creating active task 11_5 with assigned partitions [[int62_topic-5]]
2017-06-27 18:34:25 WARN  StreamThread:1184 - Could not create task 8_7. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [8_7] Failed to lock the 
state directory: /data/my-app/tmp/kafka-state/stream/8_7
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
 [my-app-stream.jar:?]
2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-2] 
Creating active task 7_9 with assigned partitions [[c0206_topic-9]]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 6_9. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [6_9] Failed to lock the 
state directory: /data/my-app/tmp/kafka-state/stream/6_9
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
 [my-app-stream.jar:?]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 15_1. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [15_1] Failed to lock the 
state directory: /data/my-app/tmp/kafka-state/stream/15_1
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
 [my-app-stream.jar:?]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-4] 
Creating active task 13_3 with assigned partitions [[a0291_topic-3]]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-1] 
Creating active task 12_4 with assigned partitions [[int77_topic-4]]


was (Author: habdank):
Hi,

I think I found much easier way to reproduce the same behaviour.
I am doing more less suche setup in the code:

{{
// loop over the inTopicName {
KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, 
STRING_SERDE, inTopicName );
stringInput.filter( streamFilter::passOrFilterMessages )
                    .map( ndmNormalizer )
                    .to( outTopicName );
}

streams = new KafkaStreams( kBuilder, streamsConfig );
streams.cleanUp();
streams.start();
}}

And if there are *_num.stream.threads=4_* but there are 20 or more inTopicNames 
(many topics to read), then complete application startup is totally 
self-blocked, by writing endless:

2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-3] 
Creating active task 11_5 with assigned partitions [[int62_topic-5]]
2017-06-27 18:34:25 WARN  StreamThread:1184 - Could not create task 8_7. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [8_7] Failed to lock the 
state directory: /data/my-app/tmp/kafka-state/stream/8_7
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
 [my-app-stream.jar:?]
2017-06-27 18:34:25 INFO  StreamThread:828 - stream-thread [StreamThread-2] 
Creating active task 7_9 with assigned partitions [[c0206_topic-9]]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 6_9. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [6_9] Failed to lock the 
state directory: /data/my-app/tmp/kafka-state/stream/6_9
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
 [my-app-stream.jar:?]
2017-06-27 18:34:26 WARN  StreamThread:1184 - Could not create task 15_1. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [15_1] Failed to lock the 
state directory: /data/my-app/tmp/kafka-state/stream/15_1
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
 ~[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
 [my-app-stream.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
 [my-app-stream.jar:?]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-4] 
Creating active task 13_3 with assigned partitions [[a0291_topic-3]]
2017-06-27 18:34:26 INFO  StreamThread:828 - stream-thread [StreamThread-1] 
Creating active task 12_4 with assigned partitions [[int77_topic-4]]

> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
>                 Key: KAFKA-5167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5167
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.11.0.0, 0.10.2.1
>            Reporter: Narendra Kumar
>            Assignee: Matthias J. Sax
>             Fix For: 0.11.1.0, 0.10.2.2, 0.11.0.1
>
>         Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to