[ https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066045#comment-16066045 ]
Seweryn Habdank-Wojewodzki edited comment on KAFKA-5167 at 6/28/17 7:32 AM: ---------------------------------------------------------------------------- Hi, I think I found much easier way to reproduce the same behaviour. I am doing more less suche setup in the code: // loop over the inTopicName(s) { KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName ); stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer ).to( outTopicName ); // } streams = new KafkaStreams( kBuilder, streamsConfig ); streams.cleanUp(); streams.start(); And if there are *_num.stream.threads=4_* but there are 20 or more inTopicNames (many topics to read), then complete application startup is totally self-blocked, by writing endless: 2017-06-27 18:34:25 INFO StreamThread:828 - stream-thread [StreamThread-3] Creating active task 11_5 with assigned partitions [[int62_topic-5]] 2017-06-27 18:34:25 WARN StreamThread:1184 - Could not create task 8_7. Will retry. org.apache.kafka.streams.errors.LockException: task [8_7] Failed to lock the state directory: /data/my-app/tmp/kafka-state/stream/8_7 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?] 2017-06-27 18:34:25 INFO StreamThread:828 - stream-thread [StreamThread-2] Creating active task 7_9 with assigned partitions [[c0206_topic-9]] 2017-06-27 18:34:26 WARN StreamThread:1184 - Could not create task 6_9. Will retry. org.apache.kafka.streams.errors.LockException: task [6_9] Failed to lock the state directory: /data/my-app/tmp/kafka-state/stream/6_9 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?] 2017-06-27 18:34:26 WARN StreamThread:1184 - Could not create task 15_1. Will retry. org.apache.kafka.streams.errors.LockException: task [15_1] Failed to lock the state directory: /data/my-app/tmp/kafka-state/stream/15_1 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?] 2017-06-27 18:34:26 INFO StreamThread:828 - stream-thread [StreamThread-4] Creating active task 13_3 with assigned partitions [[a0291_topic-3]] 2017-06-27 18:34:26 INFO StreamThread:828 - stream-thread [StreamThread-1] Creating active task 12_4 with assigned partitions [[int77_topic-4]] was (Author: habdank): Hi, I think I found much easier way to reproduce the same behaviour. I am doing more less suche setup in the code: // loop over the inTopicName(s) { KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName ); stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer ).to( outTopicName ); // } streams = new KafkaStreams( kBuilder, streamsConfig ); streams.cleanUp(); streams.start(); And if there are *_num.stream.threads=4_* but there are 20 or more inTopicNames (many topics to read), then complete application startup is totally self-blocked, by writing endless: 2017-06-27 18:34:25 INFO StreamThread:828 - stream-thread [StreamThread-3] Creating active task 11_5 with assigned partitions [[int62_topic-5]] 2017-06-27 18:34:25 WARN StreamThread:1184 - Could not create task 8_7. Will retry. org.apache.kafka.streams.errors.LockException: task [8_7] Failed to lock the state directory: /data/my-app/tmp/kafka-state/stream/8_7 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?] 2017-06-27 18:34:25 INFO StreamThread:828 - stream-thread [StreamThread-2] Creating active task 7_9 with assigned partitions [[c0206_topic-9]] 2017-06-27 18:34:26 WARN StreamThread:1184 - Could not create task 6_9. Will retry. org.apache.kafka.streams.errors.LockException: task [6_9] Failed to lock the state directory: /data/my-app/tmp/kafka-state/stream/6_9 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?] 2017-06-27 18:34:26 WARN StreamThread:1184 - Could not create task 15_1. Will retry. org.apache.kafka.streams.errors.LockException: task [15_1] Failed to lock the state directory: /data/my-app/tmp/kafka-state/stream/15_1 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [my-app-stream.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [my-app-stream.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [my-app-stream.jar:?] 2017-06-27 18:34:26 INFO StreamThread:828 - stream-thread [StreamThread-4] Creating active task 13_3 with assigned partitions [[a0291_topic-3]] 2017-06-27 18:34:26 INFO StreamThread:828 - stream-thread [StreamThread-1] Creating active task 12_4 with assigned partitions [[int77_topic-4]] > streams task gets stuck after re-balance due to LockException > ------------------------------------------------------------- > > Key: KAFKA-5167 > URL: https://issues.apache.org/jira/browse/KAFKA-5167 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0, 0.11.0.0, 0.10.2.1 > Reporter: Narendra Kumar > Assignee: Matthias J. Sax > Fix For: 0.11.1.0, 0.10.2.2, 0.11.0.1 > > Attachments: BugTest.java, DebugTransformer.java, logs.txt > > > During rebalance processor node's close() method gets called two times once > from StreamThread.suspendTasksAndState() and once from > StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed > which I am closing in processor's close method. This instance's close method > throws some exception if I call close more than once. Because of this > exception, the Kafka streams does not attempt to close the statemanager ie. > task.closeStateManager(true) is never called. When a task moves from one > thread to another within same machine the task blocks trying to get lock on > state directory which is still held by unclosed statemanager and keep > throwing the below warning message: > 2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will > retry. > org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the > state directory for task 0_1 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) -- This message was sent by Atlassian JIRA (v6.4.14#64029)