Hi Sachin,

In the bug fix release for 0.10.2 (and in trunk) we have now set 
max.poll.interval to infinite since from our experience with streams this 
should not be something that users set: 
https://github.com/apache/kafka/pull/2770/files 
<https://github.com/apache/kafka/pull/2770/files>.

We're in the process of documenting that change. For now you can increase the 
request timeout without worrying about max.poll.interval anymore. In fact I'd 
suggest you also increase max.poll.interval as we've done it above.

Thanks
Eno

> On 1 Apr 2017, at 03:28, Sachin Mittal <sjmit...@gmail.com> wrote:
> 
> Should this timeout be less than max poll interval value? if yes than
> generally speaking what should be the ratio between two or range for this
> timeout value .
> 
> Thanks
> Sachin
> 
> On 1 Apr 2017 04:57, "Matthias J. Sax" <matth...@confluent.io> wrote:
> 
> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> 
> 
> -Matthias
> 
> 
> On 3/31/17 11:32 AM, Sachin Mittal wrote:
>> Hi,
>> So I have added the config ProducerConfig.RETRIES_CONFIG,
> Integer.MAX_VALUE
>> and the NotLeaderForPartitionException is gone.
>> 
>> However we see a new exception especially under heavy load:
>> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
>> caught when producing
>>  at
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:127)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> java:76)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:280)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:787)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:774)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:749)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:671)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
>> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
> append
>> 
>> So any idea as why TimeoutException is happening.
>> Is this controlled by
>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>> 
>> If yes
>> What should the value be set in this given that out consumer
>> max.poll.interval.ms is defaul 5 minutes.
>> 
>> Is there any other setting that we should try to avoid such errors which
>> causes stream thread to die.
>> 
>> Thanks
>> Sachin
>> 
>> 
>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>> 
>>> Hi Sachin,
>>> 
>>> Not in this case.
>>> 
>>> Thanks
>>> Eno
>>> 
>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>> 
>>>> OK.
>>>> I will try this out.
>>>> 
>>>> Do I need to change anything for
>>>> max.in.flight.requests.per.connection
>>>> 
>>>> Thanks
>>>> Sachin
>>>> 
>>>> 
>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Sachin,
>>>>> 
>>>>> For this particular error, “org.apache.kafka.common.errors.
>>>>> NotLeaderForPartitionException: This server is not the leader for that
>>>>> topic-partition.”, could you try setting the number of retries to
>>> something
>>>>> large like this:
>>>>> 
>>>>> Properties props = new Properties();
>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>>>>> ...
>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>>>>> 
>>>>> This will retry the produce requests and should hopefully solve your
>>>>> immediate problem.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>> 
>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>>>>> 
>>>>>   Hi,
>>>>>   We have encountered another case of series of errors which I would
>>> need
>>>>>   more help in understanding.
>>>>> 
>>>>>   In logs we see message like this:
>>>>>   ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>>>>   85-StreamThread-3-producer]:
>>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
>>>>> task
>>>>>   [0_1] Error sending record to topic new-part-advice-key-table-
>>> changelog.
>>>>> No
>>>>>   more offsets will be recorded for this task and the exception will
>>>>>   eventually be thrown
>>>>> 
>>>>>   then some millisecond later
>>>>>   ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread -
>>>>> stream-thread
>>>>>   [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
>>>>> state:
>>>>>   org.apache.kafka.streams.errors.StreamsException: task [0_1]
>>> exception
>>>>>   caught when producing
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamTask.flushState(
>>>>> StreamTask.java:422)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
>>>>> StreamThread.java:555)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.
>>>>> performOnAllTasks(StreamThread.java:513)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.flushAllState(StreamThread.java:551)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.
>>>>> shutdownTasksAndState(StreamThread.java:463)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
>>>>> StreamThread.java:408)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:389)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>   org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>>>>> server
>>>>>   is not the leader for that topic-partition.
>>>>> 
>>>>>   finally we get this
>>>>>   ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>>>>> com.advice.TestKafkaAdvice
>>>>>   - Uncaught exception:
>>>>>   org.apache.kafka.streams.errors.StreamsException: Exception caught
>>> in
>>>>>   process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>>>>   topic=advice-stream, partition=1, offset=48062286
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:216)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> StreamThread.java:651)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:378)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>   Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>>> [0_1]
>>>>>   exception caught when producing
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:76)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:64)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>> 
>>>>> 
>>>>>   Again it is not clear why in this case we need to shut down the
>>> steams
>>>>>   thread and eventually the application. Shouldn't we capture this
>>> error
>>>>> too?
>>>>> 
>>>>>   Thanks
>>>>>   Sachin
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>> 

Reply via email to