[
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Narendra Kumar updated KAFKA-5167:
----------------------------------
Attachment: BugTest.java
DebugTransformer.java
> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
> Key: KAFKA-5167
> URL: https://issues.apache.org/jira/browse/KAFKA-5167
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: Narendra Kumar
> Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once
> from StreamThread.suspendTasksAndState() and once from
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed
> which I am closing in processor's close method. This instance's close method
> throws some exception if I call close more than once. Because of this
> exception, the Kafka streams does not attempt to close the statemanager ie.
> task.closeStateManager(true) is never called. When a task moves from one
> thread to another within same machine the task blocks trying to get lock on
> state directory which is still held by unclosed statemanager and keep
> throwing the following exception:
> 2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the
> state directory for task 0_1
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)