Re: Producer Timeout issue in kafka streams task

2021-11-02 Thread Luke Chen
Hi Pushkar,
In addition to Matthias and Guozhang's answer and clear explanation, I
think there's still one thing you should focus on:

>  I could see that 2 of the 3 brokers restarted at the same time.
It's a total 3 brokers cluster, and suddenly, 2 of them are broken. You
should try to find out the reason why it happened.
When that happens, I think not only producers will become unstable, the
consumers side should also unstable, too (trying to waiting for the
partition leader election and other things)
The suggestions from Matthias and Guozhang's, is more like the ways how you
can make Kafka more robust when this issue happened. But the root cause
should be the "brokers down" issue.

Thank you.
Luke

On Tue, Nov 2, 2021 at 11:19 AM Matthias J. Sax  wrote:

> The `Producer#send()` call is actually not covered by the KIP because it
> may result in data loss if we try to handle the timeout directly. --
> Kafka Streams does not have a copy of the data in the producer's send
> buffer and thus we cannot retry the `send()`. -- Instead, it's necessary
> to re-process the input data which is not done automatically.
>
>
> -Matthias
>
> On 11/1/21 4:34 PM, Guozhang Wang wrote:
> > Hello Pushkar,
> >
> > I'm assuming you have the same Kafka version (2.5.1) at the Streams
> client
> > side here: in those old versions, Kafka Streams relies on the embedded
> > Producer clients to handle timeouts, which requires users to correctly
> > configure such values.
> >
> > In newer version (2.8+) We have made Kafka Streams more robust to Server
> > side disconnects or soft failures that may cause timeouts:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
> .
> > So I'd suggest you upgrade to those versions, and see if those symptoms
> go
> > away.
> >
> >
> > Guozhang
> >
> > On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole 
> wrote:
> >
> >> Hi All,
> >>
> >> I am getting below issue in streams application. Kafka cluster is a 3
> >> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers
> restarted
> >> at the same time when below exception occurred in streams application
> so I
> >> can relate below exception to those brokers restarts. However, what is
> >> worrying me is the streams application did not process any events after
> >> below exception. So the question is:
> >> 1. how can i make the streams application resilient to broker issues
> e.g.
> >> the producer underneath streams should have connected to another broker
> >> instance at the time 1 broker went down, but possible the 2nd broker
> went
> >> down immediately that's why it timed out
> >> 2. In general how does streams handle broker issue and when does it
> decide
> >> to connect to another broker instance in case one instance seems to be
> in
> >> error?
> >>
> >>
> >>
> {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
> >> processing processor thread -
> >>
> >>
> analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
> >> stream - task [0_5] Abort sending since an error caught with a previous
> >> record (timestamp 1635596258179) to topic analytics-incoming-feed due to
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s)
> for
> >> analytics-incoming-feed-4:12 ms has passed since batch
> >> creation\nTimeout exception caught when sending record to topic
> >> analytics-incoming-feed. This might happen if the producer cannot send
> data
> >> to the Kafka cluster and thus, its internal buffer fills up. This can
> also
> >> happen if the broker is slow to respond, if the network connection to
> the
> >> broker was interrupted, or if similar circumstances arise. You can
> increase
> >> producer parameter `max.block.ms` to increase this
> >>
> >>
> timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> >> task [0_5] Abort sending since an error caught with a previous record
> >> (timestamp 1635596258179) to topic analytics-incoming-feed due to
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s)
> for
> >> analytics-incoming-feed-4:12 ms has passed since batch
> >> creation\nTimeout exception caught when sending record to topic
> >> analytics-incoming-feed. This might happen if the producer cannot send
> data
> >> to the Kafka cluster and thus, its internal buffer fills up. This can
> also
> >> happen if the broker is slow to respond, if the network connection to
> the
> >> broker was interrupted, or if similar circumstances arise. You can
> increase
> >> producer parameter `max.block.ms` to increase this timeout.\n\tat
> >>
> >>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
> >>
> >>
> 

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Matthias J. Sax
The `Producer#send()` call is actually not covered by the KIP because it 
may result in data loss if we try to handle the timeout directly. -- 
Kafka Streams does not have a copy of the data in the producer's send 
buffer and thus we cannot retry the `send()`. -- Instead, it's necessary 
to re-process the input data which is not done automatically.



-Matthias

On 11/1/21 4:34 PM, Guozhang Wang wrote:

Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole  wrote:


Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?


{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -

analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this

timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat

datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat

org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat

org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat

org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat

org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat

org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\nCaused by:

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Matthias J. Sax
As the error message suggests, you can increase `max.block.ms` for this 
case: If a broker is down, it may take some time for the producer to 
fail over to a different broker (before the producer can fail over, the 
broker must elect a new partition leader, and only afterward can inform 
the producer about the new broker that must be used to write into the 
partition). Increasing `max.block.ms` gives the producer (and thus the 
brokers) more time to do the fail-over.


If the fail-over fails (ie, times out), the producer raises the 
exception and KS is forced to stop processing, to avoid data loss 
(because the producer did buffer some data that would be lost due to the 
error if we commit offsets).


In general, Kafka Streams tries to handle as many broker/client errors 
as possible (and newer version handle more cases than older version). 
But there are always some case that cannot be handled by Kafka Streams. 
Of course, changing client config (producer/consumer and Kafka Streams) 
can help to make it more robust.


Thus, it comes down to monitoring of Kafka Streams:

For Kafka Streams, using newer versions you can actually register a 
uncaught exception handler that allows you to restart failed threads. In 
older versions of Kafka Streams, you can also register a callback, but 
it only informs you that a thread died. In older versions you would need 
to `close()` KafkaStreams and create a new instance and `start()` it to 
recover a died thread.



Hope this helps,

-Matthias


On 10/31/21 5:52 AM, Pushkar Deole wrote:

Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?

{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this
timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Guozhang Wang
Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole  wrote:

> Hi All,
>
> I am getting below issue in streams application. Kafka cluster is a 3
> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
> at the same time when below exception occurred in streams application so I
> can relate below exception to those brokers restarts. However, what is
> worrying me is the streams application did not process any events after
> below exception. So the question is:
> 1. how can i make the streams application resilient to broker issues e.g.
> the producer underneath streams should have connected to another broker
> instance at the time 1 broker went down, but possible the 2nd broker went
> down immediately that's why it timed out
> 2. In general how does streams handle broker issue and when does it decide
> to connect to another broker instance in case one instance seems to be in
> error?
>
>
> {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
> processing processor thread -
>
> analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
> stream - task [0_5] Abort sending since an error caught with a previous
> record (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this
>
> timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> task [0_5] Abort sending since an error caught with a previous record
> (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this timeout.\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
>
> datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
>
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch creation\n"}
>


-- 
-- Guozhang


Producer Timeout issue in kafka streams task

2021-10-31 Thread Pushkar Deole
Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?

{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this
timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch creation\n"}