Let me try to get the debug log when this error happens.

Right now we have three instances each with 4 threads consuming from 12
partition topic.
So one thread per partition.

The application is running fine much better than before. Now it usually
runs for a week even during peak load.

Sometime out of blue either rocksdb throws an exception with a single
character (which I guess is a known issue with rocks db fixed in some next
release).
Or the producer gets timed out while committing some changelog topic
record. I had increased the timeout from 30 seconds to 180 seconds, but it
still throws exception for that time also.

Not sure if these are due to VM issue or network.

But whenever something like this happens, the application goes into
rebalance and soon things take turn for worse. Soon some of the threads go
into deadlock with above stack trace and application is now in perpetual
rebalance state.

Only way to resolve this is kill all instances using -9 and restart the
instances one by one.

So also long as we have a steady state of one thread per partition
everything is working fine. I am still working out a way to limit the
changelog topic size by more aggressive compaction and let me see if that
will make things better.

I will try to get the logs when this happens next time.

Thanks
Sachin



On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Sachin,
>
> It's not necessarily a deadlock. Do you have any debug traces from those
> nodes? Also would be useful to know the config (e.g., how many partitions
> do you have and how many app instances.)
>
> Thanks
> Eno
>
> > On 9 Apr 2017, at 04:45, Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > Hi,
> > In my streams applications cluster in one or more instances I see some
> > threads always waiting with the following stack.
> >
> > Every time I check on jstack I see the following trace.
> >
> > Is this some kind of new deadlock that we have failed to identify.
> >
> > Thanks
> > Sachin
> >
> > here is the stack trace:
> > ------------------------------------------------------------
> ------------------------------------------------------------
> ----------------------------------------------
> > "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
> > runnable [0x00007fb7cb4f6000]
> >   java.lang.Thread.State: RUNNABLE
> >        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> >        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
> java:93)
> >        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> >        - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
> >        - locked <0x0000000701c50c88> (a java.util.Collections$
> > UnmodifiableSet)
> >        - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
> >        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> >        at org.apache.kafka.common.network.Selector.select(
> > Selector.java:489)
> >        at org.apache.kafka.common.network.Selector.poll(
> Selector.java:298)
> >        at org.apache.kafka.clients.NetworkClient.poll(
> > NetworkClient.java:349)
> >        at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> >        - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
> > consumer.internals.ConsumerNetworkClient)
> >        at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
> >        at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
> 138)
> >        at org.apache.kafka.clients.consumer.internals.Fetcher.
> > retrieveOffsetsByTimes(Fetcher.java:422)
> >        at org.apache.kafka.clients.consumer.internals.Fetcher.
> > resetOffset(Fetcher.java:370)
> >        at org.apache.kafka.clients.consumer.internals.Fetcher.
> > resetOffsetsIfNeeded(Fetcher.java:227)
> >        at org.apache.kafka.clients.consumer.KafkaConsumer.
> > updateFetchPositions(KafkaConsumer.java:1592)
> >        at org.apache.kafka.clients.consumer.KafkaConsumer.
> > position(KafkaConsumer.java:1265)
> >        at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)
>
>

Reply via email to