Re: Producer Timeout issue in kafka streams task
Hi Pushkar, In addition to Matthias and Guozhang's answer and clear explanation, I think there's still one thing you should focus on: > I could see that 2 of the 3 brokers restarted at the same time. It's a total 3 brokers cluster, and suddenly, 2 of them are broken. You should try to find out the reason why it happened. When that happens, I think not only producers will become unstable, the consumers side should also unstable, too (trying to waiting for the partition leader election and other things) The suggestions from Matthias and Guozhang's, is more like the ways how you can make Kafka more robust when this issue happened. But the root cause should be the "brokers down" issue. Thank you. Luke On Tue, Nov 2, 2021 at 11:19 AM Matthias J. Sax wrote: > The `Producer#send()` call is actually not covered by the KIP because it > may result in data loss if we try to handle the timeout directly. -- > Kafka Streams does not have a copy of the data in the producer's send > buffer and thus we cannot retry the `send()`. -- Instead, it's necessary > to re-process the input data which is not done automatically. > > > -Matthias > > On 11/1/21 4:34 PM, Guozhang Wang wrote: > > Hello Pushkar, > > > > I'm assuming you have the same Kafka version (2.5.1) at the Streams > client > > side here: in those old versions, Kafka Streams relies on the embedded > > Producer clients to handle timeouts, which requires users to correctly > > configure such values. > > > > In newer version (2.8+) We have made Kafka Streams more robust to Server > > side disconnects or soft failures that may cause timeouts: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams > . > > So I'd suggest you upgrade to those versions, and see if those symptoms > go > > away. > > > > > > Guozhang > > > > On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole > wrote: > > > >> Hi All, > >> > >> I am getting below issue in streams application. Kafka cluster is a 3 > >> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers > restarted > >> at the same time when below exception occurred in streams application > so I > >> can relate below exception to those brokers restarts. However, what is > >> worrying me is the streams application did not process any events after > >> below exception. So the question is: > >> 1. how can i make the streams application resilient to broker issues > e.g. > >> the producer underneath streams should have connected to another broker > >> instance at the time 1 broker went down, but possible the 2nd broker > went > >> down immediately that's why it timed out > >> 2. In general how does streams handle broker issue and when does it > decide > >> to connect to another broker instance in case one instance seems to be > in > >> error? > >> > >> > >> > {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception > >> processing processor thread - > >> > >> > analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2 > >> stream - task [0_5] Abort sending since an error caught with a previous > >> record (timestamp 1635596258179) to topic analytics-incoming-feed due to > >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) > for > >> analytics-incoming-feed-4:12 ms has passed since batch > >> creation\nTimeout exception caught when sending record to topic > >> analytics-incoming-feed. This might happen if the producer cannot send > data > >> to the Kafka cluster and thus, its internal buffer fills up. This can > also > >> happen if the broker is slow to respond, if the network connection to > the > >> broker was interrupted, or if similar circumstances arise. You can > increase > >> producer parameter `max.block.ms` to increase this > >> > >> > timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: > >> task [0_5] Abort sending since an error caught with a previous record > >> (timestamp 1635596258179) to topic analytics-incoming-feed due to > >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) > for > >> analytics-incoming-feed-4:12 ms has passed since batch > >> creation\nTimeout exception caught when sending record to topic > >> analytics-incoming-feed. This might happen if the producer cannot send > data > >> to the Kafka cluster and thus, its internal buffer fills up. This can > also > >> happen if the broker is slow to respond, if the network connection to > the > >> broker was interrupted, or if similar circumstances arise. You can > increase > >> producer parameter `max.block.ms` to increase this timeout.\n\tat > >> > >> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat > >> > >> >
Re: Producer Timeout issue in kafka streams task
The `Producer#send()` call is actually not covered by the KIP because it may result in data loss if we try to handle the timeout directly. -- Kafka Streams does not have a copy of the data in the producer's send buffer and thus we cannot retry the `send()`. -- Instead, it's necessary to re-process the input data which is not done automatically. -Matthias On 11/1/21 4:34 PM, Guozhang Wang wrote: Hello Pushkar, I'm assuming you have the same Kafka version (2.5.1) at the Streams client side here: in those old versions, Kafka Streams relies on the embedded Producer clients to handle timeouts, which requires users to correctly configure such values. In newer version (2.8+) We have made Kafka Streams more robust to Server side disconnects or soft failures that may cause timeouts: https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams. So I'd suggest you upgrade to those versions, and see if those symptoms go away. Guozhang On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole wrote: Hi All, I am getting below issue in streams application. Kafka cluster is a 3 broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted at the same time when below exception occurred in streams application so I can relate below exception to those brokers restarts. However, what is worrying me is the streams application did not process any events after below exception. So the question is: 1. how can i make the streams application resilient to broker issues e.g. the producer underneath streams should have connected to another broker instance at the time 1 broker went down, but possible the 2nd broker went down immediately that's why it timed out 2. In general how does streams handle broker issue and when does it decide to connect to another broker instance in case one instance seems to be in error? {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception processing processor thread - analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2 stream - task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:12 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:12 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
Re: Producer Timeout issue in kafka streams task
As the error message suggests, you can increase `max.block.ms` for this case: If a broker is down, it may take some time for the producer to fail over to a different broker (before the producer can fail over, the broker must elect a new partition leader, and only afterward can inform the producer about the new broker that must be used to write into the partition). Increasing `max.block.ms` gives the producer (and thus the brokers) more time to do the fail-over. If the fail-over fails (ie, times out), the producer raises the exception and KS is forced to stop processing, to avoid data loss (because the producer did buffer some data that would be lost due to the error if we commit offsets). In general, Kafka Streams tries to handle as many broker/client errors as possible (and newer version handle more cases than older version). But there are always some case that cannot be handled by Kafka Streams. Of course, changing client config (producer/consumer and Kafka Streams) can help to make it more robust. Thus, it comes down to monitoring of Kafka Streams: For Kafka Streams, using newer versions you can actually register a uncaught exception handler that allows you to restart failed threads. In older versions of Kafka Streams, you can also register a callback, but it only informs you that a thread died. In older versions you would need to `close()` KafkaStreams and create a new instance and `start()` it to recover a died thread. Hope this helps, -Matthias On 10/31/21 5:52 AM, Pushkar Deole wrote: Hi All, I am getting below issue in streams application. Kafka cluster is a 3 broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted at the same time when below exception occurred in streams application so I can relate below exception to those brokers restarts. However, what is worrying me is the streams application did not process any events after below exception. So the question is: 1. how can i make the streams application resilient to broker issues e.g. the producer underneath streams should have connected to another broker instance at the time 1 broker went down, but possible the 2nd broker went down immediately that's why it timed out 2. In general how does streams handle broker issue and when does it decide to connect to another broker instance in case one instance seems to be in error? {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception processing processor thread - analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2 stream - task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:12 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:12 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
Re: Producer Timeout issue in kafka streams task
Hello Pushkar, I'm assuming you have the same Kafka version (2.5.1) at the Streams client side here: in those old versions, Kafka Streams relies on the embedded Producer clients to handle timeouts, which requires users to correctly configure such values. In newer version (2.8+) We have made Kafka Streams more robust to Server side disconnects or soft failures that may cause timeouts: https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams. So I'd suggest you upgrade to those versions, and see if those symptoms go away. Guozhang On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole wrote: > Hi All, > > I am getting below issue in streams application. Kafka cluster is a 3 > broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted > at the same time when below exception occurred in streams application so I > can relate below exception to those brokers restarts. However, what is > worrying me is the streams application did not process any events after > below exception. So the question is: > 1. how can i make the streams application resilient to broker issues e.g. > the producer underneath streams should have connected to another broker > instance at the time 1 broker went down, but possible the 2nd broker went > down immediately that's why it timed out > 2. In general how does streams handle broker issue and when does it decide > to connect to another broker instance in case one instance seems to be in > error? > > > {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception > processing processor thread - > > analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2 > stream - task [0_5] Abort sending since an error caught with a previous > record (timestamp 1635596258179) to topic analytics-incoming-feed due to > org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for > analytics-incoming-feed-4:12 ms has passed since batch > creation\nTimeout exception caught when sending record to topic > analytics-incoming-feed. This might happen if the producer cannot send data > to the Kafka cluster and thus, its internal buffer fills up. This can also > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this > > timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: > task [0_5] Abort sending since an error caught with a previous record > (timestamp 1635596258179) to topic analytics-incoming-feed due to > org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for > analytics-incoming-feed-4:12 ms has passed since batch > creation\nTimeout exception caught when sending record to topic > analytics-incoming-feed. This might happen if the producer cannot send data > to the Kafka cluster and thus, its internal buffer fills up. This can also > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout.\n\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat > > datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat > > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat > > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat > > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat > java.base/java.lang.Thread.run(Unknown Source)\nCaused by: > org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for > analytics-incoming-feed-4:12 ms has passed since batch creation\n"} > -- -- Guozhang
Producer Timeout issue in kafka streams task
Hi All, I am getting below issue in streams application. Kafka cluster is a 3 broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted at the same time when below exception occurred in streams application so I can relate below exception to those brokers restarts. However, what is worrying me is the streams application did not process any events after below exception. So the question is: 1. how can i make the streams application resilient to broker issues e.g. the producer underneath streams should have connected to another broker instance at the time 1 broker went down, but possible the 2nd broker went down immediately that's why it timed out 2. In general how does streams handle broker issue and when does it decide to connect to another broker instance in case one instance seems to be in error? {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception processing processor thread - analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2 stream - task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:12 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:12 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:12 ms has passed since batch creation\n"}