Thanks again for the quick response Eno.

We just left the application running in the hope it would recover; After
~1hour it's still just continuously spilling out the same exception and not
managing to continue processing.

On 25 April 2017 at 16:24, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Ian,
>
> Retries are sometimes expected and don't always indicate a problem. We
> should probably adjust the printing of the messages to not print this
> warning frequently. Are you seeing any crash or does the app proceed?
>
> Thanks
> Eno
>
> On 25 Apr 2017 4:02 p.m., "Ian Duffy" <i...@ianduffy.ie> wrote:
>
> Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
> Seeing much less issues and much smoother performance.
> They withstood ISR changes.
>
> Seen the following when more consumers were added to a consumer group:
>
> 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
> o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry.
> org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock
> the state directory for task 1_21
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateM
> anager.<init>(ProcessorStateManager.java:100)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.<
> init>(AbstractTask.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<
> init>(StreamTask.java:108)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> createStreamTask(StreamThread.java:864)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$
> TaskCreator.createTask(StreamThread.java:1237)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$Ab
> stractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> addStreamTasks(StreamThread.java:967)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> access$600(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:234)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.onJoinComplete(ConsumerCoordinator.java:259)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor.joinGroupIfNeeded(AbstractCoordinator.java:352)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.poll(ConsumerCoordinator.java:290)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> KafkaConsumer.java:1029)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> runLoop(StreamThread.java:592)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> run(StreamThread.java:361)
>
>
>
> On 24 April 2017 at 16:02, Eno Thereska <eno.there...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > In KIP-62 a background heartbeat thread was introduced to deal with the
> > group protocol arrivals and departures. There is a setting called
> > session.timeout.ms that specifies the timeout of that background thread.
> > So if the thread has died that background thread will also die and the
> > right thing will happen.
> >
> > Eno
> >
> > > On 24 Apr 2017, at 15:34, Sachin Mittal <sjmit...@gmail.com> wrote:
> > >
> > > I had a question about this setting
> > > ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> > Integer.toString(Integer.MAX_
> > > VALUE)
> > >
> > > How would the broker know if a thread has died or say we simply stopped
> > an
> > > instance and needs to be booted out of the group.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.there...@gmail.com>
> > > wrote:
> > >
> > >> Hi Ian,
> > >>
> > >>
> > >> This is now fixed in 0.10.2.1. The default configuration need
> tweaking.
> > If
> > >> you can't pick that up (it's currently being voted), make sure you
> have
> > >> these two parameters set as follows in your streams config:
> > >>
> > >> final Properties props = new Properties();
> > >> ...
> > >> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
> > from
> > >> default of 0
> > >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> > >> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> > >> from default of 300 s
> > >>
> > >> Thanks
> > >> Eno
> > >>
> > >>> On 24 Apr 2017, at 10:38, Ian Duffy <i...@ianduffy.ie> wrote:
> > >>>
> > >>> Hi All,
> > >>>
> > >>> We're running multiple Kafka Stream applications using Kafka client
> > >>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> > >>> Additionally, we're running Kafka Connect 0.10.2.0 with the
> > ElasticSearch
> > >>> connector by confluent [1]
> > >>>
> > >>> On an ISR change occurring on the brokers, all of the streams
> > >> applications
> > >>> and the Kafka connect ES connector threw exceptions and never
> > recovered.
> > >>>
> > >>> We've seen a correlation between Kafka Broker ISR change and stream
> > >>> applications dying.
> > >>>
> > >>> The logs from the streams applications throw out the following and
> fail
> > >> to
> > >>> recover:
> > >>>
> > >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> > >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > >> RUNNING
> > >>> to NOT_RUNNING
> > >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> > >>> Unexpected Exception caught in thread [StreamThread-1]:
> > >>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> in
> > >>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> > >>> topic=kafka-topic, partition=81, offset=479285
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:216)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:641)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:368)
> > >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> > [0_81]
> > >>> exception caught when producing
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > >> checkForException(RecordCollectorImpl.java:119)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > >> RecordCollectorImpl.java:76)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.SinkNode.
> > >> process(SinkNode.java:79)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > >> ProcessorNode.java:48)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:188)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > >> ProcessorNode.java:134)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> SourceNode.process(SourceNode.java:70)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:197)
> > >>> ... 2 common frames omitted
> > >>> Caused by: org.apache.kafka.common.errors.
> > NotLeaderForPartitionException
> > >> :
> > >>> This server is not the leader for that topic-partition.
> > >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> > >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > >> RUNNING
> > >>> to NOT_RUNNING
> > >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> > >>> Unexpected Exception caught in thread [StreamThread-3]:
> > >>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> in
> > >>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> > >>> topic=kafka-topic, partition=55, offset=479308
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:216)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:641)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:368)
> > >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> > [0_55]
> > >>> exception caught when producing
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > >> checkForException(RecordCollectorImpl.java:119)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > >> RecordCollectorImpl.java:76)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.SinkNode.
> > >> process(SinkNode.java:79)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > >> ProcessorNode.java:48)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:188)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > >> ProcessorNode.java:134)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> SourceNode.process(SourceNode.java:70)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:197)
> > >>> ... 2 common frames omitted
> > >>> Caused by: org.apache.kafka.common.errors.
> > NotLeaderForPartitionException
> > >> :
> > >>> This server is not the leader for that topic-partition.
> > >>>
> > >>> Are we potentially doing something wrong with our streams
> > >>> configuration/usage? Or does this look like a bug?
> > >>>
> > >>> Thanks,
> > >>> Ian.
> > >>>
> > >>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> > >>
> > >>
> >
> >
>

Reply via email to