Re: Kafka Consumer fetch-size/rate and Producer queue timeout
Hi Gordon, Any further thoughts on this? I forgot to mention I am using Flink 1.3.2 and our Kafka is 0.10. We are in the process of upgrading Kafka but won’t be in Prod for at least couple of months. Thanks, Ashish > On Nov 8, 2017, at 9:35 PM, Ashish Pokharelwrote: > > Hi Grodon, > > Thanks for your responses. It definitely makes sense. > > I could pull this stack from the logs, entire log itself is pretty big - let > me know if some samples before/after this may help. > > TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217) > ... 7 more > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > ... 18 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 > record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) > at >
Re: Kafka Consumer fetch-size/rate and Producer queue timeout
Hi Grodon, Thanks for your responses. It definitely makes sense. I could pull this stack from the logs, entire log itself is pretty big - let me know if some samples before/after this may help. TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ... 18 more Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ... 24 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append Also for reference here is my ProducerConfig from logs: INFO
Re: Kafka Consumer fetch-size/rate and Producer queue timeout
Hi Ashish, From your description I do not yet have much of an idea of what may be happening. However, some of your observations seems reasonable. I’ll go through them one by one: I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. If the client is experiencing trouble in writing outstanding records to Kafka, and the timeout is increased, then I think increased back pressure is indeed the expected behavior. I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer should be a natural consequence of backpressure in the job. The fetch loop in the consumer will be blocked temporarily when backpressure is propagated from downstream operators, resulting in longer fetch intervals and larger batches on each fetch (given that events rate are still constant). Therefore, I think the root cause is still along the lines of the producer side. Would you happen to have any logs that maybe shows any useful information on the producer side? I think we might have a better chance of finding out what is going on by digging there. Also, which Flink version & Kafka version are you using? Cheers, Gordon On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com) wrote: All, I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense. Thanks, Ashish
Re: Kafka Consumer fetch-size/rate and Producer queue timeout
Thanks Fabian. I am seeing thia consistently and can definitely use some help. I have plenty of graphana views I can share if that helps :) Sent from Yahoo Mail on Android On Tue, Nov 7, 2017 at 3:54 AM, Fabian Hueskewrote: Hi Ashish, Gordon (in CC) might be able to help you. Cheers, Fabian 2017-11-05 16:24 GMT+01:00 Ashish Pokharel : All, I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense. Thanks, Ashish
Kafka Consumer fetch-size/rate and Producer queue timeout
All, I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense. Thanks, Ashish