One instance with 10 threads may cause rocksdb issues.
What is the RAM you have?

Also check CPU wait time. Many rocks db instances on one machine (depends
upon number of partitions) may cause lot of disk i/o causing wait times to
increase and hence slowing down the message processing causing frequent
rebalance's.

Also what is your topic partitions. My experience is having one thread per
partition is ideal.

Thanks
Sachin


On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <joao.harti...@gmail.com>
wrote:

> There is one instance with 10 threads.
>
> On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > João,
> >
> > Do you also have multiple running instances in parallel, and how many
> > threads are your running within each instance?
> >
> > Guozhang
> >
> >
> > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <joao.harti...@gmail.com>
> > wrote:
> >
> > > Eno before I do so I just want to be sure this would not be a
> duplicate.
> > I
> > > just found the following issues:
> > >
> > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being
> > fixed
> > > on 0.11.0.0/0.10.2.2 (both not released afaik)
> > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in
> > progress
> > >
> > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.there...@gmail.com>
> > > wrote:
> > >
> > > > Hi there,
> > > >
> > > > This might be a bug, would you mind opening a JIRA (copy-pasting
> below
> > is
> > > > sufficient).
> > > >
> > > > Thanks
> > > > Eno
> > > > > On 7 Jun 2017, at 21:38, João Peixoto <joao.harti...@gmail.com>
> > wrote:
> > > > >
> > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error
> > > > >
> > > > > 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> > > > > o.a.k.s.p.internals.StreamThread         : Could not create task
> > 0_31.
> > > > Will
> > > > > retry.
> > > > >
> > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed
> to
> > > lock
> > > > > the state directory for task 0_31
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> > > AbstractTask.java:73)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.<init>(StreamTask.java:108)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:864)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread$TaskCreator.
> > > createTask(StreamThread.java:1237)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread$
> > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> > > > > ~[kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:967)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.access$600(
> > > StreamThread.java:69)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:234)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > onJoinComplete(ConsumerCoordinator.java:259)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > joinGroupIfNeeded(AbstractCoordinator.java:352)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureActiveGroup(AbstractCoordinator.java:303)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:290)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1029)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:995)
> > > > > [kafka-clients-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:592)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > > at
> > > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:361)
> > > > > [kafka-streams-0.10.2.1.jar!/:na]
> > > > >
> > > > >
> > > > > It has been printing it for hours now, so it does not recover at
> all.
> > > > > The most worrying thing is that this stream definition does not
> even
> > > use
> > > > > state stores, it literally looks like this:
> > > > >
> > > > > KStreamBuilder builder = new KStreamBuilder();
> > > > >        KStream<byte[], Message> kStream =
> > > > > builder.stream(appOptions.getInput().getTopic());
> > > > >        kStream.process(() -> processor);
> > > > >        new KafkaStreams(builder, streamsConfiguration);
> > > > >
> > > > > The "processor" does its thing and calls "context().commit()" when
> > > done.
> > > > > That's it. Looking at the actual machine running the instance, the
> > > > folders
> > > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> > > > >
> > > > > This seems to have been bootstrapped by the exception:
> > > > >
> > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > > cannot be
> > > > > completed since the group has already rebalanced and assigned the
> > > > > partitions to another member. This means that the time between
> > > subsequent
> > > > > calls to poll() was longer than the configured
> max.poll.interval.ms,
> > > > which
> > > > > typically implies that the poll loop is spending too much time
> > message
> > > > > processing. You can address this either by increasing the session
> > > timeout
> > > > > or by reducing the maximum size of batches returned in poll() with
> > > > > max.poll.records.
> > > > >
> > > > > We are addressing the latter by reducing "max.poll.records" and
> > > > increasing "
> > > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not
> worry
> > > > about
> > > > > state dirs if there are no state stores? Since it doesn't seem to
> do
> > so
> > > > > automatically, can I configured it somehow to achieve this end?
> > > > >
> > > > > Additionally, what could lead to it not being able to recover?
> > > > >
> > > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > > > wrote:
> > > > >
> > > > >> Great! :)
> > > > >>
> > > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote:
> > > > >>> I see now that my Kafka cluster is very stable, and these errors
> > dont
> > > > >> come
> > > > >>> now.
> > > > >>>
> > > > >>> -Sameer.
> > > > >>>
> > > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <
> > sam.kum.w...@gmail.com
> > > >
> > > > >> wrote:
> > > > >>>
> > > > >>>> Yes, I have upgraded my cluster and client both to version
> 10.2.1
> > > and
> > > > >>>> currently monitoring the situation.
> > > > >>>> Will report back in case I find any errors. Thanks for the help
> > > > though.
> > > > >>>>
> > > > >>>> -Sameer.
> > > > >>>>
> > > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <
> > > > matth...@confluent.io>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Did you see Eno's reply?
> > > > >>>>>
> > > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there.
> If
> > > > not,
> > > > >>>>> please report back.
> > > > >>>>>
> > > > >>>>> I would also recommend to subscribe to the list. It's
> > self-service
> > > > >>>>> http://kafka.apache.org/contact
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> -Matthias
> > > > >>>>>
> > > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
> > > > >>>>>> My brokers are on version 10.1.0 and my clients are on version
> > > > 10.2.0.
> > > > >>>>>> Also, do a reply to all, I am currently not subscribed to the
> > > list.
> > > > >>>>>>
> > > > >>>>>> -Sameer.
> > > > >>>>>>
> > > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <
> > > > sam.kum.w...@gmail.com>
> > > > >>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi,
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I ran two nodes in my streams compute cluster, they were
> > running
> > > > fine
> > > > >>>>> for
> > > > >>>>>>> few hours before outputting with failure to rebalance errors.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I couldnt understand why this happened but I saw one strange
> > > > >>>>> behaviour...
> > > > >>>>>>>
> > > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory"
> > > > error,
> > > > >>>>> this
> > > > >>>>>>> might have caused the partitions to relocate and hence the
> > error.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I am attaching detailed logs for both the nodes, please see
> if
> > > you
> > > > >> can
> > > > >>>>>>> help.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Some of the logs for quick reference are these.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught
> in
> > > > thread
> > > > >>>>>>> StreamThread-2
> > > > >>>>>>>
> > > > >>>>>>> org.apache.kafka.streams.errors.StreamsException:
> stream-thread
> > > > >>>>>>> [StreamThread-2] Failed to rebalance
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
> > > > >>>>>>>
> > > > >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> > > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > > >>>>> d.java:488)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:
> 69)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
> > > > >>>>> ad.java:259)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
> > > > >>>>>>> dinator.java:396)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
> > > > >>>>>>> Coordinator.java:329)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
> > > > >>>>>>> Coordinator.java:303)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>>
> > > > mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
> > > > >>>>>>>
> > > > >>>>>>>                ... 1 more
> > > > >>>>>>>
> > > > >>>>>>> Caused by: org.apache.kafka.clients.consumer.
> > > CommitFailedException:
> > > > >>>>>>> Commit cannot be completed since the group has already
> > rebalanced
> > > > and
> > > > >>>>>>> assigned the partitions to another member. This means that
> the
> > > time
> > > > >>>>> between
> > > > >>>>>>> subsequent calls to poll() was longer than the configured
> > > > >>>>>>> max.poll.interval.ms, which typically implies that the poll
> > loop
> > > > is
> > > > >>>>>>> spending too much time message processing. You can address
> this
> > > > >> either
> > > > >>>>> by
> > > > >>>>>>> increasing the session timeout or by reducing the maximum
> size
> > of
> > > > >>>>> batches
> > > > >>>>>>> returned in poll() with max.poll.records.
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > > > >>>>>>> nsumerCoordinator.java:698)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
> > > > >>>>>>> Coordinator.java:577)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.clients.consu
> > > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>>
> > > > ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>>
> > ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
> > > > >>>>> d.java:480)
> > > > >>>>>>>
> > > > >>>>>>>                ... 10 more
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not
> create
> > > task
> > > > >>>>> 1_38.
> > > > >>>>>>> Will retry.
> > > > >>>>>>>
> > > > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38]
> > > Failed
> > > > to
> > > > >>>>> lock
> > > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
> > > > >>>>>>> nager.java:102)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.
> > > java:834)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
> > > > >>>>> ead.java:1207)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
> > > > >>>>>>> koff(StreamThread.java:1180)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.
> > > java:937)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
> > > > >>>>>>>
> > > > >>>>>>>                at org.apache.kafka.streams.proce
> > > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
> > > > >>>>> ead.java:236)
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Regards,
> > > > >>>>>>>
> > > > >>>>>>> -Sameer.
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to