[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312888#comment-15312888
 ] 

Guozhang Wang commented on KAFKA-3758:
--------------------------------------

Hi Greg, I have a few more questions that will help me investigating this 
issue. Would be appreciated if you can remember the answer:

1. How many threads (i.e. the {{num.stream.threads}} config) did you use per 
each KafkaStreams instance?
2. From your logs it seems you have at least two topic groups, and with at 
least 82 partitions for the largest topic. How many topic-partitions do you 
have in total?

> KStream job fails to recover after Kafka broker stopped
> -------------------------------------------------------
>
>                 Key: KAFKA-3758
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3758
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Greg Fodor
>            Assignee: Guozhang Wang
>         Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
>  1704         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
>  1705         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725         at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>  1732         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>  1733         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>  1734         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>  1735         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>  1736 Caused by: java.io.IOException: Failed to lock the state directory: 
> /muon/state/job-stream_photon_messages-1/2_82
>  1737         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
>  1738         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
>  1739         ... 32 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to