[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308047#comment-15308047
 ] 

Guozhang Wang commented on KAFKA-3758:
--------------------------------------

I think there may be an issue about directory locking that results in both this 
and KAFKA-3752. 
I will investigate further and update these tickets later.

One question: is your Kafka Streams processes running on the same node as Kafka 
servers, i.e. if you shutdown some Kafka broker does that also kill some 
running Kafka Streams process?


> KStream job fails to recover after Kafka broker stopped
> -------------------------------------------------------
>
>                 Key: KAFKA-3758
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3758
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Greg Fodor
>            Assignee: Guozhang Wang
>         Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
>  1704         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
>  1705         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725         at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>  1732         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>  1733         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>  1734         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>  1735         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>  1736 Caused by: java.io.IOException: Failed to lock the state directory: 
> /muon/state/job-stream_photon_messages-1/2_82
>  1737         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
>  1738         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
>  1739         ... 32 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to