[
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eno Thereska updated KAFKA-3758:
--------------------------------
Resolution: Fixed
Status: Resolved (was: Patch Available)
> KStream job fails to recover after Kafka broker stopped
> -------------------------------------------------------
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Affects Versions: 0.10.0.0
> Reporter: Greg Fodor
> Assignee: Eno Thereska
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load
> it seems the job fails to rebalance + recover if we shut down one of the
> kafka brokers. The test we were running had a 3-node kafka cluster where each
> topic had at least a replication factor of 2, and we terminated one of the
> nodes.
> Attached is the full log, the root exception seems to be contention on the
> lock on the state directory. The job continues to try to recover but throws
> errors relating to locks over and over. Restarting the job itself resolves
> the problem.
> 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while
> creating the state manager
> 1703 at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
> 1704 at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
> 1705 at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
> 1706 at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
> 1707 at
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
> 1708 at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
> 1709 at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
> 1710 at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
> 1711 at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
> 1712 at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> 1713 at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> 1714 at
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
> 1715 at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> 1716 at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> 1717 at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
> 1718 at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
> 1719 at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> 1720 at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> 1721 at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> 1722 at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> 1723 at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> 1724 at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> 1725 at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> 1726 at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> 1727 at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> 1728 at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> 1729 at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> 1730 at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> 1731 at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> 1732 at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> 1733 at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> 1734 at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> 1735 at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 1736 Caused by: java.io.IOException: Failed to lock the state directory:
> /muon/state/job-stream_photon_messages-1/2_82
> 1737 at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
> 1738 at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
> 1739 ... 32 more
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)