Hi there, This might be a bug, would you mind opening a JIRA (copy-pasting below is sufficient).
Thanks Eno > On 7 Jun 2017, at 21:38, João Peixoto <joao.harti...@gmail.com> wrote: > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : Could not create task 0_31. Will > retry. > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock > the state directory for task 0_31 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > [kafka-streams-0.10.2.1.jar!/:na] > > > It has been printing it for hours now, so it does not recover at all. > The most worrying thing is that this stream definition does not even use > state stores, it literally looks like this: > > KStreamBuilder builder = new KStreamBuilder(); > KStream<byte[], Message> kStream = > builder.stream(appOptions.getInput().getTopic()); > kStream.process(() -> processor); > new KafkaStreams(builder, streamsConfiguration); > > The "processor" does its thing and calls "context().commit()" when done. > That's it. Looking at the actual machine running the instance, the folders > under /tmp/kafka-streams/<stream name>/ only have a .lock file. > > This seems to have been bootstrapped by the exception: > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time message > processing. You can address this either by increasing the session timeout > or by reducing the maximum size of batches returned in poll() with > max.poll.records. > > We are addressing the latter by reducing "max.poll.records" and increasing " > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about > state dirs if there are no state stores? Since it doesn't seem to do so > automatically, can I configured it somehow to achieve this end? > > Additionally, what could lead to it not being able to recover? > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Great! :) >> >> On 5/16/17 2:31 AM, Sameer Kumar wrote: >>> I see now that my Kafka cluster is very stable, and these errors dont >> come >>> now. >>> >>> -Sameer. >>> >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sam.kum.w...@gmail.com> >> wrote: >>> >>>> Yes, I have upgraded my cluster and client both to version 10.2.1 and >>>> currently monitoring the situation. >>>> Will report back in case I find any errors. Thanks for the help though. >>>> >>>> -Sameer. >>>> >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> Did you see Eno's reply? >>>>> >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not, >>>>> please report back. >>>>> >>>>> I would also recommend to subscribe to the list. It's self-service >>>>> http://kafka.apache.org/contact >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote: >>>>>> My brokers are on version 10.1.0 and my clients are on version 10.2.0. >>>>>> Also, do a reply to all, I am currently not subscribed to the list. >>>>>> >>>>>> -Sameer. >>>>>> >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sam.kum.w...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> >>>>>>> >>>>>>> I ran two nodes in my streams compute cluster, they were running fine >>>>> for >>>>>>> few hours before outputting with failure to rebalance errors. >>>>>>> >>>>>>> >>>>>>> I couldnt understand why this happened but I saw one strange >>>>> behaviour... >>>>>>> >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory" error, >>>>> this >>>>>>> might have caused the partitions to relocate and hence the error. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I am attaching detailed logs for both the nodes, please see if you >> can >>>>>>> help. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Some of the logs for quick reference are these. >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread >>>>>>> StreamThread-2 >>>>>>> >>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread >>>>>>> [StreamThread-2] Failed to rebalance >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368) >>>>>>> >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea >>>>> d.java:488) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre >>>>> ad.java:259) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor >>>>>>> dinator.java:396) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract >>>>>>> Coordinator.java:329) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract >>>>>>> Coordinator.java:303) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582) >>>>>>> >>>>>>> ... 1 more >>>>>>> >>>>>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: >>>>>>> Commit cannot be completed since the group has already rebalanced and >>>>>>> assigned the partitions to another member. This means that the time >>>>> between >>>>>>> subsequent calls to poll() was longer than the configured >>>>>>> max.poll.interval.ms, which typically implies that the poll loop is >>>>>>> spending too much time message processing. You can address this >> either >>>>> by >>>>>>> increasing the session timeout or by reducing the maximum size of >>>>> batches >>>>>>> returned in poll() with max.poll.records. >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co >>>>>>> nsumerCoordinator.java:698) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer >>>>>>> Coordinator.java:577) >>>>>>> >>>>>>> at org.apache.kafka.clients.consu >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea >>>>> d.java:480) >>>>>>> >>>>>>> ... 10 more >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2017-05-03 16:53:57 WARN StreamThread:1184 - Could not create task >>>>> 1_38. >>>>>>> Will retry. >>>>>>> >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to >>>>> lock >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38 >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa >>>>>>> nager.java:102) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr >>>>> ead.java:1207) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac >>>>>>> koff(StreamThread.java:1180) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69) >>>>>>> >>>>>>> at org.apache.kafka.streams.proce >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr >>>>> ead.java:236) >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> -Sameer. >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >>