I'm using Kafka Streams 0.10.2.1 and I still see this error

2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : Could not create task 0_31. Will
retry.

org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
the state directory for task 0_31
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[kafka-streams-0.10.2.1.jar!/:na]


It has been printing it for hours now, so it does not recover at all.
The most worrying thing is that this stream definition does not even use
state stores, it literally looks like this:

 KStreamBuilder builder = new KStreamBuilder();
        KStream<byte[], Message> kStream =
builder.stream(appOptions.getInput().getTopic());
        kStream.process(() -> processor);
        new KafkaStreams(builder, streamsConfiguration);

The "processor" does its thing and calls "context().commit()" when done.
That's it. Looking at the actual machine running the instance, the folders
under /tmp/kafka-streams/<stream name>/ only have a .lock file.

This seems to have been bootstrapped by the exception:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.

We are addressing the latter by reducing "max.poll.records" and increasing "
commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about
state dirs if there are no state stores? Since it doesn't seem to do so
automatically, can I configured it somehow to achieve this end?

Additionally, what could lead to it not being able to recover?

On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Great! :)
>
> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > I see now that my Kafka cluster is very stable, and these errors dont
> come
> > now.
> >
> > -Sameer.
> >
> > On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sam.kum.w...@gmail.com>
> wrote:
> >
> >> Yes, I have upgraded my cluster and client both to version 10.2.1 and
> >> currently monitoring the situation.
> >> Will report back in case I find any errors. Thanks for the help though.
> >>
> >> -Sameer.
> >>
> >> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Did you see Eno's reply?
> >>>
> >>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
> >>> please report back.
> >>>
> >>> I would also recommend to subscribe to the list. It's self-service
> >>> http://kafka.apache.org/contact
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> >>>> My brokers are on version 10.1.0 and my clients are on version 10.2.0.
> >>>> Also, do a reply to all, I am currently not subscribed to the list.
> >>>>
> >>>> -Sameer.
> >>>>
> >>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sam.kum.w...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>>
> >>>>>
> >>>>> I ran two nodes in my streams compute cluster, they were running fine
> >>> for
> >>>>> few hours before outputting with failure to rebalance errors.
> >>>>>
> >>>>>
> >>>>> I couldnt understand why this happened but I saw one strange
> >>> behaviour...
> >>>>>
> >>>>> at 16:53 on node1, I saw "Failed to lock the state directory" error,
> >>> this
> >>>>> might have caused the partitions to relocate and hence the error.
> >>>>>
> >>>>>
> >>>>>
> >>>>> I am attaching detailed logs for both the nodes, please see if you
> can
> >>>>> help.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Some of the logs for quick reference are these.
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in thread
> >>>>> StreamThread-2
> >>>>>
> >>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
> >>>>> [StreamThread-2] Failed to rebalance
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> >>>>>
> >>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> >>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>> d.java:488)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> >>> ad.java:259)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> >>>>> dinator.java:396)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> >>>>> Coordinator.java:329)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> >>>>> Coordinator.java:303)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> >>>>>
> >>>>>                 ... 1 more
> >>>>>
> >>>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
> >>>>> Commit cannot be completed since the group has already rebalanced and
> >>>>> assigned the partitions to another member. This means that the time
> >>> between
> >>>>> subsequent calls to poll() was longer than the configured
> >>>>> max.poll.interval.ms, which typically implies that the poll loop is
> >>>>> spending too much time message processing. You can address this
> either
> >>> by
> >>>>> increasing the session timeout or by reducing the maximum size of
> >>> batches
> >>>>> returned in poll() with max.poll.records.
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> >>>>> nsumerCoordinator.java:698)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> >>>>> Coordinator.java:577)
> >>>>>
> >>>>>                 at org.apache.kafka.clients.consu
> >>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> >>> d.java:480)
> >>>>>
> >>>>>                 ... 10 more
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create task
> >>> 1_38.
> >>>>> Will retry.
> >>>>>
> >>>>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed to
> >>> lock
> >>>>> the state directory: /data/streampoc/LIC2-5/1_38
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> >>>>> nager.java:102)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> >>> ead.java:1207)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> >>>>> koff(StreamThread.java:1180)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> >>>>>
> >>>>>                 at org.apache.kafka.streams.proce
> >>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> >>> ead.java:236)
> >>>>>
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> -Sameer.
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to