Should this timeout be less than max poll interval value? if yes than generally speaking what should be the ratio between two or range for this timeout value .
Thanks Sachin On 1 Apr 2017 04:57, "Matthias J. Sax" <matth...@confluent.io> wrote: Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG -Matthias On 3/31/17 11:32 AM, Sachin Mittal wrote: > Hi, > So I have added the config ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE > and the NotLeaderForPartitionException is gone. > > However we see a new exception especially under heavy load: > org.apache.kafka.streams.errors.StreamsException: task [0_1] exception > caught when producing > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:119) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush( RecordCollectorImpl.java:127) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask. java:76) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:188) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask. java:280) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( StreamThread.java:787) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( StreamThread.java:774) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( StreamThread.java:749) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( StreamThread.java:671) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at > org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:378) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > new-part-advice-key-table-changelog-1: 30001 ms has passed since last append > > So any idea as why TimeoutException is happening. > Is this controlled by > ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG > > If yes > What should the value be set in this given that out consumer > max.poll.interval.ms is defaul 5 minutes. > > Is there any other setting that we should try to avoid such errors which > causes stream thread to die. > > Thanks > Sachin > > > On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Sachin, >> >> Not in this case. >> >> Thanks >> Eno >> >>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmit...@gmail.com> wrote: >>> >>> OK. >>> I will try this out. >>> >>> Do I need to change anything for >>> max.in.flight.requests.per.connection >>> >>> Thanks >>> Sachin >>> >>> >>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com> >>> wrote: >>> >>>> Hi Sachin, >>>> >>>> For this particular error, “org.apache.kafka.common.errors. >>>> NotLeaderForPartitionException: This server is not the leader for that >>>> topic-partition.”, could you try setting the number of retries to >> something >>>> large like this: >>>> >>>> Properties props = new Properties(); >>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); >>>> ... >>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); >>>> >>>> This will retry the produce requests and should hopefully solve your >>>> immediate problem. >>>> >>>> Thanks >>>> Eno >>>> >>>> >>>> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote: >>>> >>>> Hi, >>>> We have encountered another case of series of errors which I would >> need >>>> more help in understanding. >>>> >>>> In logs we see message like this: >>>> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread | >>>> 85-StreamThread-3-producer]: >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl - >>>> task >>>> [0_1] Error sending record to topic new-part-advice-key-table- >> changelog. >>>> No >>>> more offsets will be recorded for this task and the exception will >>>> eventually be thrown >>>> >>>> then some millisecond later >>>> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]: >>>> org.apache.kafka.streams.processor.internals.StreamThread - >>>> stream-thread >>>> [StreamThread-3] Failed while executing StreamTask 0_1 due to flush >>>> state: >>>> org.apache.kafka.streams.errors.StreamsException: task [0_1] >> exception >>>> caught when producing >>>> at >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >>>> checkForException(RecordCollectorImpl.java:119) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals. >>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamTask.flushState( >>>> StreamTask.java:422) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread$4.apply( >>>> StreamThread.java:555) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread. >>>> performOnAllTasks(StreamThread.java:513) >>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals. >>>> StreamThread.flushAllState(StreamThread.java:551) >>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread. >>>> shutdownTasksAndState(StreamThread.java:463) >>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.shutdown( >>>> StreamThread.java:408) >>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals. >>>> StreamThread.run(StreamThread.java:389) >>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> org.apache.kafka.common.errors.NotLeaderForPartitionException: This >>>> server >>>> is not the leader for that topic-partition. >>>> >>>> finally we get this >>>> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]: >>>> com.advice.TestKafkaAdvice >>>> - Uncaught exception: >>>> org.apache.kafka.streams.errors.StreamsException: Exception caught >> in >>>> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, >>>> topic=advice-stream, partition=1, offset=48062286 >>>> at >>>> org.apache.kafka.streams.processor.internals. >>>> StreamTask.process(StreamTask.java:216) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>>> StreamThread.java:651) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals. >>>> StreamThread.run(StreamThread.java:378) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task >>>> [0_1] >>>> exception caught when producing >>>> at >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >>>> checkForException(RecordCollectorImpl.java:119) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl.send( >>>> RecordCollectorImpl.java:76) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> at >>>> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl.send( >>>> RecordCollectorImpl.java:64) >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>>> >>>> >>>> Again it is not clear why in this case we need to shut down the >> steams >>>> thread and eventually the application. Shouldn't we capture this >> error >>>> too? >>>> >>>> Thanks >>>> Sachin >>>> >>>> >>>> >>>> >> >> >