Greg Fodor created KAFKA-3758:
---------------------------------
Summary: KStream job fails to recover after Kafka broker stopped
Key: KAFKA-3758
URL: https://issues.apache.org/jira/browse/KAFKA-3758
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang
We've been doing some testing of a fairly complex KStreams job and under load
it seems the job fails to rebalance + recover if we shut down one of the kafka
brokers. The test we were running had a 3-node kafka cluster where each topic
had at least a replication factor of 2, and we terminated one of the nodes.
Attached is the full log, the root exception seems to be contention on the lock
on the state directory. The job continues to try to recover but throws errors
relating to locks over and over. Restarting the job itself resolves the problem.
1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while
creating the state manager
1703 at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
1704 at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
1705 at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
1706 at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
1707 at
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
1708 at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
1709 at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
1710 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
1711 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
1712 at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
1713 at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
1714 at
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
1715 at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
1716 at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
1717 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
1718 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
1719 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
1720 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
1721 at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
1722 at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
1723 at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
1724 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
1725 at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
1726 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
1727 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
1728 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
1729 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
1730 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
1731 at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
1732 at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
1733 at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
1734 at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
1735 at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
1736 Caused by: java.io.IOException: Failed to lock the state directory:
/muon/state/job-stream_photon_messages-1/2_82
1737 at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
1738 at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
1739 ... 32 more
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)