Hi Sachin,

Not in this case.

Thanks
Eno

> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmit...@gmail.com> wrote:
> 
> OK.
> I will try this out.
> 
> Do I need to change anything for
> max.in.flight.requests.per.connection
> 
> Thanks
> Sachin
> 
> 
> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> Hi Sachin,
>> 
>> For this particular error, “org.apache.kafka.common.errors.
>> NotLeaderForPartitionException: This server is not the leader for that
>> topic-partition.”, could you try setting the number of retries to something
>> large like this:
>> 
>> Properties props = new Properties();
>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>> ...
>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>> 
>> This will retry the produce requests and should hopefully solve your
>> immediate problem.
>> 
>> Thanks
>> Eno
>> 
>> 
>> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>> 
>>    Hi,
>>    We have encountered another case of series of errors which I would need
>>    more help in understanding.
>> 
>>    In logs we see message like this:
>>    ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>    85-StreamThread-3-producer]:
>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
>> task
>>    [0_1] Error sending record to topic new-part-advice-key-table-changelog.
>> No
>>    more offsets will be recorded for this task and the exception will
>>    eventually be thrown
>> 
>>    then some millisecond later
>>    ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>    org.apache.kafka.streams.processor.internals.StreamThread -
>> stream-thread
>>    [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
>> state:
>>    org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
>>    caught when producing
>>        at
>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> checkForException(RecordCollectorImpl.java:119)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.StreamTask.flushState(
>> StreamTask.java:422)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
>> StreamThread.java:555)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.StreamThread.
>> performOnAllTasks(StreamThread.java:513)
>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.
>> StreamThread.flushAllState(StreamThread.java:551)
>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.StreamThread.
>> shutdownTasksAndState(StreamThread.java:463)
>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
>> StreamThread.java:408)
>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:389)
>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>    org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>>    is not the leader for that topic-partition.
>> 
>>    finally we get this
>>    ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>> com.advice.TestKafkaAdvice
>>    - Uncaught exception:
>>    org.apache.kafka.streams.errors.StreamsException: Exception caught in
>>    process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>    topic=advice-stream, partition=1, offset=48062286
>>        at
>>    org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:216)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:651)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:378)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>    Caused by: org.apache.kafka.streams.errors.StreamsException: task
>> [0_1]
>>    exception caught when producing
>>        at
>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> checkForException(RecordCollectorImpl.java:119)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
>> RecordCollectorImpl.java:76)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>        at
>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
>> RecordCollectorImpl.java:64)
>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> 
>> 
>>    Again it is not clear why in this case we need to shut down the steams
>>    thread and eventually the application. Shouldn't we capture this error
>> too?
>> 
>>    Thanks
>>    Sachin
>> 
>> 
>> 
>> 

Reply via email to