Hi Pushkar,
In addition to Matthias and Guozhang's answer and clear explanation, I
think there's still one thing you should focus on:

>  I could see that 2 of the 3 brokers restarted at the same time.
It's a total 3 brokers cluster, and suddenly, 2 of them are broken. You
should try to find out the reason why it happened.
When that happens, I think not only producers will become unstable, the
consumers side should also unstable, too (trying to waiting for the
partition leader election and other things)
The suggestions from Matthias and Guozhang's, is more like the ways how you
can make Kafka more robust when this issue happened. But the root cause
should be the "brokers down" issue.

Thank you.
Luke

On Tue, Nov 2, 2021 at 11:19 AM Matthias J. Sax <mj...@apache.org> wrote:

> The `Producer#send()` call is actually not covered by the KIP because it
> may result in data loss if we try to handle the timeout directly. --
> Kafka Streams does not have a copy of the data in the producer's send
> buffer and thus we cannot retry the `send()`. -- Instead, it's necessary
> to re-process the input data which is not done automatically.
>
>
> -Matthias
>
> On 11/1/21 4:34 PM, Guozhang Wang wrote:
> > Hello Pushkar,
> >
> > I'm assuming you have the same Kafka version (2.5.1) at the Streams
> client
> > side here: in those old versions, Kafka Streams relies on the embedded
> > Producer clients to handle timeouts, which requires users to correctly
> > configure such values.
> >
> > In newer version (2.8+) We have made Kafka Streams more robust to Server
> > side disconnects or soft failures that may cause timeouts:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
> .
> > So I'd suggest you upgrade to those versions, and see if those symptoms
> go
> > away.
> >
> >
> > Guozhang
> >
> > On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole <pdeole2...@gmail.com>
> wrote:
> >
> >> Hi All,
> >>
> >> I am getting below issue in streams application. Kafka cluster is a 3
> >> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers
> restarted
> >> at the same time when below exception occurred in streams application
> so I
> >> can relate below exception to those brokers restarts. However, what is
> >> worrying me is the streams application did not process any events after
> >> below exception. So the question is:
> >> 1. how can i make the streams application resilient to broker issues
> e.g.
> >> the producer underneath streams should have connected to another broker
> >> instance at the time 1 broker went down, but possible the 2nd broker
> went
> >> down immediately that's why it timed out
> >> 2. In general how does streams handle broker issue and when does it
> decide
> >> to connect to another broker instance in case one instance seems to be
> in
> >> error?
> >>
> >>
> >>
> {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
> >> processing processor thread -
> >>
> >>
> analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
> >> stream - task [0_5] Abort sending since an error caught with a previous
> >> record (timestamp 1635596258179) to topic analytics-incoming-feed due to
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s)
> for
> >> analytics-incoming-feed-4:120000 ms has passed since batch
> >> creation\nTimeout exception caught when sending record to topic
> >> analytics-incoming-feed. This might happen if the producer cannot send
> data
> >> to the Kafka cluster and thus, its internal buffer fills up. This can
> also
> >> happen if the broker is slow to respond, if the network connection to
> the
> >> broker was interrupted, or if similar circumstances arise. You can
> increase
> >> producer parameter `max.block.ms` to increase this
> >>
> >>
> timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> >> task [0_5] Abort sending since an error caught with a previous record
> >> (timestamp 1635596258179) to topic analytics-incoming-feed due to
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s)
> for
> >> analytics-incoming-feed-4:120000 ms has passed since batch
> >> creation\nTimeout exception caught when sending record to topic
> >> analytics-incoming-feed. This might happen if the producer cannot send
> data
> >> to the Kafka cluster and thus, its internal buffer fills up. This can
> also
> >> happen if the broker is slow to respond, if the network connection to
> the
> >> broker was interrupted, or if similar circumstances arise. You can
> increase
> >> producer parameter `max.block.ms` to increase this timeout.\n\tat
> >>
> >>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
> >>
> >>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
> >>
> >>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
> >>
> >>
> datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
> >>
> >>
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
> >>
> >>
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
> >>
> >>
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
> >>
> >>
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
> >>
> >>
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
> >>
> >>
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
> >>
> >>
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
> >> java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s)
> for
> >> analytics-incoming-feed-4:120000 ms has passed since batch creation\n"}
> >>
> >
> >
>

Reply via email to