Hello Sachin,

When instance is stopped, it will stop the underlying heart beat thread
during the stopping process so that the coordinator will realize it's
leaving the group.

As for non-graceful stopping, say there is a bug in the stream app code
that cause the thread to die, currently Streams library captures most of
the exceptions, plus we rely on the global error handling for unexpected
exceptions. This is admittedly not ideal, and we are working on finer
grained error handling to fix such issues.

Guozhang

On Mon, Apr 24, 2017 at 7:34 AM, Sachin Mittal <sjmit...@gmail.com> wrote:

> I had a question about this setting
> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_
> VALUE)
>
> How would the broker know if a thread has died or say we simply stopped an
> instance and needs to be booted out of the group.
>
> Thanks
> Sachin
>
>
> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
> > Hi Ian,
> >
> >
> > This is now fixed in 0.10.2.1. The default configuration need tweaking.
> If
> > you can't pick that up (it's currently being voted), make sure you have
> > these two parameters set as follows in your streams config:
> >
> > final Properties props = new Properties();
> > ...
> > props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from
> > default of 0
> > props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> > Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> > from default of 300 s
> >
> > Thanks
> > Eno
> >
> > > On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote:
> > >
> > > Hi All,
> > >
> > > We're running multiple Kafka Stream applications using Kafka client
> > > 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> > > Additionally, we're running Kafka Connect 0.10.2.0 with the
> ElasticSearch
> > > connector by confluent [1]
> > >
> > > On an ISR change occurring on the brokers, all of the streams
> > applications
> > > and the Kafka connect ES connector threw exceptions and never
> recovered.
> > >
> > > We've seen a correlation between Kafka Broker ISR change and stream
> > > applications dying.
> > >
> > > The logs from the streams applications throw out the following and fail
> > to
> > > recover:
> > >
> > > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > RUNNING
> > > to NOT_RUNNING
> > > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> > > Unexpected Exception caught in thread [StreamThread-1]:
> > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> > > topic=kafka-topic, partition=81, offset=479285
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:216)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:641)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_81]
> > > exception caught when producing
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:119)
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> > RecordCollectorImpl.java:76)
> > > at
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:79)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:48)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:134)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:70)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:197)
> > > ... 2 common frames omitted
> > > Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> > :
> > > This server is not the leader for that topic-partition.
> > > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > RUNNING
> > > to NOT_RUNNING
> > > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> > > Unexpected Exception caught in thread [StreamThread-3]:
> > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> > > topic=kafka-topic, partition=55, offset=479308
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:216)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:641)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_55]
> > > exception caught when producing
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:119)
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> > RecordCollectorImpl.java:76)
> > > at
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:79)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:48)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:134)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:70)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:197)
> > > ... 2 common frames omitted
> > > Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> > :
> > > This server is not the leader for that topic-partition.
> > >
> > > Are we potentially doing something wrong with our streams
> > > configuration/usage? Or does this look like a bug?
> > >
> > > Thanks,
> > > Ian.
> > >
> > > [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> >
> >
>



-- 
-- Guozhang

Reply via email to