Hi Sachin, Not in this case.
Thanks Eno > On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmit...@gmail.com> wrote: > > OK. > I will try this out. > > Do I need to change anything for > max.in.flight.requests.per.connection > > Thanks > Sachin > > > On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Sachin, >> >> For this particular error, “org.apache.kafka.common.errors. >> NotLeaderForPartitionException: This server is not the leader for that >> topic-partition.”, could you try setting the number of retries to something >> large like this: >> >> Properties props = new Properties(); >> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); >> ... >> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); >> >> This will retry the produce requests and should hopefully solve your >> immediate problem. >> >> Thanks >> Eno >> >> >> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote: >> >> Hi, >> We have encountered another case of series of errors which I would need >> more help in understanding. >> >> In logs we see message like this: >> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread | >> 85-StreamThread-3-producer]: >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl - >> task >> [0_1] Error sending record to topic new-part-advice-key-table-changelog. >> No >> more offsets will be recorded for this task and the exception will >> eventually be thrown >> >> then some millisecond later >> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]: >> org.apache.kafka.streams.processor.internals.StreamThread - >> stream-thread >> [StreamThread-3] Failed while executing StreamTask 0_1 due to flush >> state: >> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception >> caught when producing >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >> checkForException(RecordCollectorImpl.java:119) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl.flush(RecordCollectorImpl.java:127) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.StreamTask.flushState( >> StreamTask.java:422) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.StreamThread$4.apply( >> StreamThread.java:555) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> performOnAllTasks(StreamThread.java:513) >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals. >> StreamThread.flushAllState(StreamThread.java:551) >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> shutdownTasksAndState(StreamThread.java:463) >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.StreamThread.shutdown( >> StreamThread.java:408) >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:389) >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> org.apache.kafka.common.errors.NotLeaderForPartitionException: This >> server >> is not the leader for that topic-partition. >> >> finally we get this >> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]: >> com.advice.TestKafkaAdvice >> - Uncaught exception: >> org.apache.kafka.streams.errors.StreamsException: Exception caught in >> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, >> topic=advice-stream, partition=1, offset=48062286 >> at >> org.apache.kafka.streams.processor.internals. >> StreamTask.process(StreamTask.java:216) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:651) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:378) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> Caused by: org.apache.kafka.streams.errors.StreamsException: task >> [0_1] >> exception caught when producing >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >> checkForException(RecordCollectorImpl.java:119) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( >> RecordCollectorImpl.java:76) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( >> RecordCollectorImpl.java:64) >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >> >> Again it is not clear why in this case we need to shut down the steams >> thread and eventually the application. Shouldn't we capture this error >> too? >> >> Thanks >> Sachin >> >> >> >>