There is one instance with 10 threads.

On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wangg...@gmail.com> wrote:

> João,
>
> Do you also have multiple running instances in parallel, and how many
> threads are your running within each instance?
>
> Guozhang
>
>
> On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <joao.harti...@gmail.com>
> wrote:
>
> > Eno before I do so I just want to be sure this would not be a duplicate.
> I
> > just found the following issues:
> >
> > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> fixed
> > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> progress
> >
> > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> > > Hi there,
> > >
> > > This might be a bug, would you mind opening a JIRA (copy-pasting below
> is
> > > sufficient).
> > >
> > > Thanks
> > > Eno
> > > > On 7 Jun 2017, at 21:38, João Peixoto <joao.harti...@gmail.com>
> wrote:
> > > >
> > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > >
> > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > o.a.k.s.p.internals.StreamThread         : Could not create task
> 0_31.
> > > Will
> > > > retry.
> > > >
> > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to
> > lock
> > > > the state directory for task 0_31
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > AbstractTask.java:73)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.<init>(StreamTask.java:108)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.createStreamTask(StreamThread.java:864)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> > createTask(StreamThread.java:1237)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$
> > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> > StreamThread.java:69)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > onPartitionsAssigned(StreamThread.java:234)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > onJoinComplete(ConsumerCoordinator.java:259)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:290)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1029)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:592)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:361)
> > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > >
> > > >
> > > > It has been printing it for hours now, so it does not recover at all.
> > > > The most worrying thing is that this stream definition does not even
> > use
> > > > state stores, it literally looks like this:
> > > >
> > > > KStreamBuilder builder = new KStreamBuilder();
> > > >        KStream<byte[], Message> kStream =
> > > > builder.stream(appOptions.getInput().getTopic());
> > > >        kStream.process(() -> processor);
> > > >        new KafkaStreams(builder, streamsConfiguration);
> > > >
> > > > The "processor" does its thing and calls "context().commit()" when
> > done.
> > > > That's it. Looking at the actual machine running the instance, the
> > > folders
> > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> > > >
> > > > This seems to have been bootstrapped by the exception:
> > > >
> > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > cannot be
> > > > completed since the group has already rebalanced and assigned the
> > > > partitions to another member. This means that the time between
> > subsequent
> > > > calls to poll() was longer than the configured max.poll.interval.ms,
> > > which
> > > > typically implies that the poll loop is spending too much time
> message
> > > > processing. You can address this either by increasing the session
> > timeout
> > > > or by reducing the maximum size of batches returned in poll() with
> > > > max.poll.records.
> > > >
> > > > We are addressing the latter by reducing "max.poll.records" and
> > > increasing "
> > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry
> > > about
> > > > state dirs if there are no state stores? Since it doesn't seem to do
> so
> > > > automatically, can I configured it somehow to achieve this end?
> > > >
> > > > Additionally, what could lead to it not being able to recover?
> > > >
> > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> Great! :)
> > > >>
> > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > > >>> I see now that my Kafka cluster is very stable, and these errors
> dont
> > > >> come
> > > >>> now.
> > > >>>
> > > >>> -Sameer.
> > > >>>
> > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
> sam.kum.w...@gmail.com
> > >
> > > >> wrote:
> > > >>>
> > > >>>> Yes, I have upgraded my cluster and client both to version 10.2.1
> > and
> > > >>>> currently monitoring the situation.
> > > >>>> Will report back in case I find any errors. Thanks for the help
> > > though.
> > > >>>>
> > > >>>> -Sameer.
> > > >>>>
> > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> > > matth...@confluent.io>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Did you see Eno's reply?
> > > >>>>>
> > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If
> > > not,
> > > >>>>> please report back.
> > > >>>>>
> > > >>>>> I would also recommend to subscribe to the list. It's
> self-service
> > > >>>>> http://kafka.apache.org/contact
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> > > >>>>>> My brokers are on version 10.1.0 and my clients are on version
> > > 10.2.0.
> > > >>>>>> Also, do a reply to all, I am currently not subscribed to the
> > list.
> > > >>>>>>
> > > >>>>>> -Sameer.
> > > >>>>>>
> > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> > > sam.kum.w...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I ran two nodes in my streams compute cluster, they were
> running
> > > fine
> > > >>>>> for
> > > >>>>>>> few hours before outputting with failure to rebalance errors.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I couldnt understand why this happened but I saw one strange
> > > >>>>> behaviour...
> > > >>>>>>>
> > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory"
> > > error,
> > > >>>>> this
> > > >>>>>>> might have caused the partitions to relocate and hence the
> error.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I am attaching detailed logs for both the nodes, please see if
> > you
> > > >> can
> > > >>>>>>> help.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Some of the logs for quick reference are these.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in
> > > thread
> > > >>>>>>> StreamThread-2
> > > >>>>>>>
> > > >>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > >>>>>>> [StreamThread-2] Failed to rebalance
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> > > >>>>>>>
> > > >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > >>>>> d.java:488)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> > > >>>>> ad.java:259)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> > > >>>>>>> dinator.java:396)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> > > >>>>>>> Coordinator.java:329)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> > > >>>>>>> Coordinator.java:303)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>>
> > > mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> > > >>>>>>>
> > > >>>>>>>                ... 1 more
> > > >>>>>>>
> > > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
> > CommitFailedException:
> > > >>>>>>> Commit cannot be completed since the group has already
> rebalanced
> > > and
> > > >>>>>>> assigned the partitions to another member. This means that the
> > time
> > > >>>>> between
> > > >>>>>>> subsequent calls to poll() was longer than the configured
> > > >>>>>>> max.poll.interval.ms, which typically implies that the poll
> loop
> > > is
> > > >>>>>>> spending too much time message processing. You can address this
> > > >> either
> > > >>>>> by
> > > >>>>>>> increasing the session timeout or by reducing the maximum size
> of
> > > >>>>> batches
> > > >>>>>>> returned in poll() with max.poll.records.
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > > >>>>>>> nsumerCoordinator.java:698)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> > > >>>>>>> Coordinator.java:577)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.clients.consu
> > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>>
> > > ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>>
> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > >>>>> d.java:480)
> > > >>>>>>>
> > > >>>>>>>                ... 10 more
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create
> > task
> > > >>>>> 1_38.
> > > >>>>>>> Will retry.
> > > >>>>>>>
> > > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
> > Failed
> > > to
> > > >>>>> lock
> > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> > > >>>>>>> nager.java:102)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
> > java:834)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> > > >>>>> ead.java:1207)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> > > >>>>>>> koff(StreamThread.java:1180)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
> > java:937)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> > > >>>>>>>
> > > >>>>>>>                at org.apache.kafka.streams.proce
> > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> > > >>>>> ead.java:236)
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>>
> > > >>>>>>> -Sameer.
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to