[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308047#comment-15308047 ]
Guozhang Wang commented on KAFKA-3758: -------------------------------------- I think there may be an issue about directory locking that results in both this and KAFKA-3752. I will investigate further and update these tickets later. One question: is your Kafka Streams processes running on the same node as Kafka servers, i.e. if you shutdown some Kafka broker does that also kill some running Kafka Streams process? > KStream job fails to recover after Kafka broker stopped > ------------------------------------------------------- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Greg Fodor > Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > 1732 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > 1733 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > 1734 at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > 1735 at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > 1736 Caused by: java.io.IOException: Failed to lock the state directory: > /muon/state/job-stream_photon_messages-1/2_82 > 1737 at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95) > 1738 at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) > 1739 ... 32 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)