Greg Fodor created KAFKA-3758:
---------------------------------

             Summary: KStream job fails to recover after Kafka broker stopped
                 Key: KAFKA-3758
                 URL: https://issues.apache.org/jira/browse/KAFKA-3758
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.0.0
            Reporter: Greg Fodor
            Assignee: Guozhang Wang


We've been doing some testing of a fairly complex KStreams job and under load 
it seems the job fails to rebalance + recover if we shut down one of the kafka 
brokers. The test we were running had a 3-node kafka cluster where each topic 
had at least a replication factor of 2, and we terminated one of the nodes.

Attached is the full log, the root exception seems to be contention on the lock 
on the state directory. The job continues to try to recover but throws errors 
relating to locks over and over. Restarting the job itself resolves the problem.

 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
creating the state manager
 1703         at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
 1704         at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
 1705         at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
 1706         at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
 1707         at 
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
 1708         at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
 1709         at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
 1710         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
 1711         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
 1712         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1713         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1714         at 
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
 1715         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1716         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1717         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
 1718         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
 1719         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
 1720         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
 1721         at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
 1722         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1723         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1724         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
 1725         at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
 1726         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
 1727         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 1728         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
 1729         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
 1730         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
 1731         at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
 1732         at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
 1733         at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
 1734         at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
 1735         at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
 1736 Caused by: java.io.IOException: Failed to lock the state directory: 
/muon/state/job-stream_photon_messages-1/2_82
 1737         at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
 1738         at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
 1739         ... 32 more




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to