Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Luke Chen
Hi Pushkar,
In addition to Matthias and Guozhang's answer and clear explanation, I
think there's still one thing you should focus on:

>  I could see that 2 of the 3 brokers restarted at the same time.
It's a total 3 brokers cluster, and suddenly, 2 of them are broken. You
should try to find out the reason why it happened.
When that happens, I think not only producers will become unstable, the
consumers side should also unstable, too (trying to waiting for the
partition leader election and other things)
The suggestions from Matthias and Guozhang's, is more like the ways how you
can make Kafka more robust when this issue happened. But the root cause
should be the "brokers down" issue.

Thank you.
Luke

On Tue, Nov 2, 2021 at 11:19 AM Matthias J. Sax  wrote:

> The `Producer#send()` call is actually not covered by the KIP because it
> may result in data loss if we try to handle the timeout directly. --
> Kafka Streams does not have a copy of the data in the producer's send
> buffer and thus we cannot retry the `send()`. -- Instead, it's necessary
> to re-process the input data which is not done automatically.
>
>
> -Matthias
>
> On 11/1/21 4:34 PM, Guozhang Wang wrote:
> > Hello Pushkar,
> >
> > I'm assuming you have the same Kafka version (2.5.1) at the Streams
> client
> > side here: in those old versions, Kafka Streams relies on the embedded
> > Producer clients to handle timeouts, which requires users to correctly
> > configure such values.
> >
> > In newer version (2.8+) We have made Kafka Streams more robust to Server
> > side disconnects or soft failures that may cause timeouts:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
> .
> > So I'd suggest you upgrade to those versions, and see if those symptoms
> go
> > away.
> >
> >
> > Guozhang
> >
> > On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole 
> wrote:
> >
> >> Hi All,
> >>
> >> I am getting below issue in streams application. Kafka cluster is a 3
> >> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers
> restarted
> >> at the same time when below exception occurred in streams application
> so I
> >> can relate below exception to those brokers restarts. However, what is
> >> worrying me is the streams application did not process any events after
> >> below exception. So the question is:
> >> 1. how can i make the streams application resilient to broker issues
> e.g.
> >> the producer underneath streams should have connected to another broker
> >> instance at the time 1 broker went down, but possible the 2nd broker
> went
> >> down immediately that's why it timed out
> >> 2. In general how does streams handle broker issue and when does it
> decide
> >> to connect to another broker instance in case one instance seems to be
> in
> >> error?
> >>
> >>
> >>
> {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
> >> processing processor thread -
> >>
> >>
> analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
> >> stream - task [0_5] Abort sending since an error caught with a previous
> >> record (timestamp 1635596258179) to topic analytics-incoming-feed due to
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s)
> for
> >> analytics-incoming-feed-4:12 ms has passed since batch
> >> creation\nTimeout exception caught when sending record to topic
> >> analytics-incoming-feed. This might happen if the producer cannot send
> data
> >> to the Kafka cluster and thus, its internal buffer fills up. This can
> also
> >> happen if the broker is slow to respond, if the network connection to
> the
> >> broker was interrupted, or if similar circumstances arise. You can
> increase
> >> producer parameter `max.block.ms` to increase this
> >>
> >>
> timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> >> task [0_5] Abort sending since an error caught with a previous record
> >> (timestamp 1635596258179) to topic analytics-incoming-feed due to
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s)
> for
> >> analytics-incoming-feed-4:12 ms has passed since batch
> >> creation\nTimeout exception caught when sending record to topic
> >> analytics-incoming-feed. This might happen if the producer cannot send
> data
> >> to the Kafka cluster and thus, its internal buffer fills up. This can
> also
> >> happen if the broker is slow to respond, if the network connection to
> the
> >> broker was interrupted, or if similar circumstances arise. You can
> increase
> >> producer parameter `max.block.ms` to increase this timeout.\n\tat
> >>
> >>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
> >>
> >>
> org.

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Matthias J. Sax
The `Producer#send()` call is actually not covered by the KIP because it 
may result in data loss if we try to handle the timeout directly. -- 
Kafka Streams does not have a copy of the data in the producer's send 
buffer and thus we cannot retry the `send()`. -- Instead, it's necessary 
to re-process the input data which is not done automatically.



-Matthias

On 11/1/21 4:34 PM, Guozhang Wang wrote:

Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole  wrote:


Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?


{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -

analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this

timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat

datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat

org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat

org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat

org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat

org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat

org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
org.apache.kafka.common.err

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Matthias J. Sax
As the error message suggests, you can increase `max.block.ms` for this 
case: If a broker is down, it may take some time for the producer to 
fail over to a different broker (before the producer can fail over, the 
broker must elect a new partition leader, and only afterward can inform 
the producer about the new broker that must be used to write into the 
partition). Increasing `max.block.ms` gives the producer (and thus the 
brokers) more time to do the fail-over.


If the fail-over fails (ie, times out), the producer raises the 
exception and KS is forced to stop processing, to avoid data loss 
(because the producer did buffer some data that would be lost due to the 
error if we commit offsets).


In general, Kafka Streams tries to handle as many broker/client errors 
as possible (and newer version handle more cases than older version). 
But there are always some case that cannot be handled by Kafka Streams. 
Of course, changing client config (producer/consumer and Kafka Streams) 
can help to make it more robust.


Thus, it comes down to monitoring of Kafka Streams:

For Kafka Streams, using newer versions you can actually register a 
uncaught exception handler that allows you to restart failed threads. In 
older versions of Kafka Streams, you can also register a callback, but 
it only informs you that a thread died. In older versions you would need 
to `close()` KafkaStreams and create a new instance and `start()` it to 
recover a died thread.



Hope this helps,

-Matthias


On 10/31/21 5:52 AM, Pushkar Deole wrote:

Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?

{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this
timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
org.apache.kafka.clients.produce

Re: Stream to KTable internals

2021-11-01 Thread Matthias J. Sax
Timestamp synchronization is not perfect, and as a matter of fact, we 
fixed a few gaps in 3.0.0 release. We actually hope, that we closed the 
last gaps in 3.0.0... *fingers-crossed* :)


We are using a timestamp extractor that returns 0. 


You can do this, and it effectively "disables" timestamp synchronization 
as records on the KTable side don't have a timeline any longer. As a 
side effect it also allows you to "bootstrap" the table, as records with 
timestamp zero will always be processed first (as they are smaller). Of 
course, you also don't have time synchronization for "future" data and 
your program becomes non-deterministic if you reprocess old data.



his seemed to be the only
way to bootstrap enough records at startup to avoid the missed join.


Using 3.0.0 and enabling timestamp synchronization via 
`max.task.idle.ms` config, should allow you to get the correct behavior 
without the zero-extractor (of course, your KTable data must have 
smaller timestamps that your KStream data).



If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on startup?


To be more precise: timestamp synchronization is _always_ on. The 
question is just how strict it is applied. By default, we do the weakest 
from which is only best effort.



I'm guessing the issue here is that occasionally the poll request is not
returning the matching record for the KTable side of the join before the
task goes off and starts processing records.


Yes, because of default best effort approach. That is why you should 
increase `max.task.idle.ms` to detect this case and "skip" processing 
and let KS do another poll() to get KTable data.


2.8 and earlier:

max.task.idle.ms=0 -> best effort (no poll() retry)
max.task.idle.ms>0 -> try to do another poll() until data is there or 
idle time passed


Note: >0 might still "fail" even if there is data, because consumer 
fetch behavior is not predictable.



3.0:

max.task.idle.ms=-1 -> best effort (no poll() retry)
max.task.idle.ms=0 -> if there is data broker side, repeat to poll() 
until you get the data
max.task.idle.ms>0 -> even if there is not data broker side, wait until 
data becomes available or the idle time passed



Hope this helps.


-Matthias

On 11/1/21 4:29 PM, Guozhang Wang wrote:

Hello Chad,

 From your earlier comment, you mentioned "In my scenario the records were
written to the KTable topic before the record was written to the KStream
topic." So I think Matthias and others have excluded this possibility while
trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then you would
miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler 
wrote:


Thank you for your response and the links to the presentations.


*However, this seems tobe orthogonal to your issue?*

Yes. From what I see in the code it looks like you have a single consumer
subscribed to multiple topics. Please correct me if I'm wrong.


*By default, timestamp synchronization is disabled. Maybeenabling it would
help?*

We are using a timestamp extractor that returns 0. We did that because we
were almost always missing joins on startup, and this seemed to be the only
way to bootstrap enough records at startup to avoid the missed join. We
found a post that said doing that would make the KTable act like the
GlobalKTable at startup. So far this works great, we never miss a join on a
startup. If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on startup?

I'm guessing the issue here is that occasionally the poll request is not
returning the matching record for the KTable side of the join before the
task goes off and starts processing records. Later when we put the same
record on the topic and the KTable has had a chance to load more records
the join works and everything is good to go. Because of the way our system
works no new status records have been written and so the new record joins
against the correct status.

Do you agree that the poll request is returning the KStream record but not
returning the KTable record and therefore the join is getting missed? If
you don't agree, what do you think is going on? Is there a way to prove
this out?

Thanks,
Chad

On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax  wrote:


Yes, a StreamThread has one consumer. The number of StreamThreads per
instance is configurable via `num.stream.threads`. Partitions are
assigned to threads similar to consumer is a plain consumer group.

It seems you run with the default of one thread per instance. As you
spin up 12 instances, it results in 12 threads for the application. As
you have 12 partitions, using more threads won't be 

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Guozhang Wang
Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole  wrote:

> Hi All,
>
> I am getting below issue in streams application. Kafka cluster is a 3
> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
> at the same time when below exception occurred in streams application so I
> can relate below exception to those brokers restarts. However, what is
> worrying me is the streams application did not process any events after
> below exception. So the question is:
> 1. how can i make the streams application resilient to broker issues e.g.
> the producer underneath streams should have connected to another broker
> instance at the time 1 broker went down, but possible the 2nd broker went
> down immediately that's why it timed out
> 2. In general how does streams handle broker issue and when does it decide
> to connect to another broker instance in case one instance seems to be in
> error?
>
>
> {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
> processing processor thread -
>
> analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
> stream - task [0_5] Abort sending since an error caught with a previous
> record (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this
>
> timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> task [0_5] Abort sending since an error caught with a previous record
> (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this timeout.\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
>
> datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
>
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch creation\n"}
>


-- 
-- Guozhang


Re: Stream to KTable internals

2021-11-01 Thread Guozhang Wang
Hello Chad,

>From your earlier comment, you mentioned "In my scenario the records were
written to the KTable topic before the record was written to the KStream
topic." So I think Matthias and others have excluded this possibility while
trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then you would
miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler 
wrote:

> Thank you for your response and the links to the presentations.
>
>
> *However, this seems tobe orthogonal to your issue?*
>
> Yes. From what I see in the code it looks like you have a single consumer
> subscribed to multiple topics. Please correct me if I'm wrong.
>
>
> *By default, timestamp synchronization is disabled. Maybeenabling it would
> help?*
>
> We are using a timestamp extractor that returns 0. We did that because we
> were almost always missing joins on startup, and this seemed to be the only
> way to bootstrap enough records at startup to avoid the missed join. We
> found a post that said doing that would make the KTable act like the
> GlobalKTable at startup. So far this works great, we never miss a join on a
> startup. If I use "timestamp synchronization" do I have to remove the zero
> timestamp extractor? If I remove the zero timestamp extractor will
> timestamp synchronization take care of the missed join issue on startup?
>
> I'm guessing the issue here is that occasionally the poll request is not
> returning the matching record for the KTable side of the join before the
> task goes off and starts processing records. Later when we put the same
> record on the topic and the KTable has had a chance to load more records
> the join works and everything is good to go. Because of the way our system
> works no new status records have been written and so the new record joins
> against the correct status.
>
> Do you agree that the poll request is returning the KStream record but not
> returning the KTable record and therefore the join is getting missed? If
> you don't agree, what do you think is going on? Is there a way to prove
> this out?
>
> Thanks,
> Chad
>
> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax  wrote:
>
> > Yes, a StreamThread has one consumer. The number of StreamThreads per
> > instance is configurable via `num.stream.threads`. Partitions are
> > assigned to threads similar to consumer is a plain consumer group.
> >
> > It seems you run with the default of one thread per instance. As you
> > spin up 12 instances, it results in 12 threads for the application. As
> > you have 12 partitions, using more threads won't be useful as no
> > partitions are left for them to process.
> >
> > For a stream-table joins, there will be one task per "partition pair"
> > that computes the join for those partitions. So you get 12 tasks, and
> > each thread processes one task in your setup. Ie, a thread consumer is
> > reading data for both input topics.
> >
> > Pausing happens on a per-partition bases: for joins there is two buffers
> > per task (one for each input topic partition). It's possible that one
> > partition is paused while the other is processed. However, this seems to
> > be orthogonal to your issue?
> >
> > For a GlobalKTable, you get an additional GlobalThread that only reads
> > the data from the "global topic" to update the GlobalKTable. Semantics
> > of KStream-KTable and KStream-GlobalKTable joins are different: Cf
> >
> >
> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
> >
> > For the timestamp synchronization, you may checkout `max.task.idle.ms`
> > config. By default, timestamp synchronization is disabled. Maybe
> > enabling it would help?
> >
> > You may also check out slides 34-38:
> >
> >
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
> >
> > There is one corner case: if two records with the same timestamp come
> > it, it's not defined which one will be processed first.
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> >
> > On 10/30/21 6:45 AM, Chad Preisler wrote:
> > > Yes, this helped. I have some additional questions.
> > >
> > > Does StreamThread have one consumer? (Looks like it, but just want to
> > > confirm)
> > > Is there a separate StreamThread for each topic including the KTable?
> > > If a KTable is a StreamThread and there is a  StreamTask for that
> KTable,
> > > could my buffer be getting filled up, and the mainConsumer for the
> KTable
> > > be getting paused? I see this code in StreamTask#addRecords.
> > >
> > > // if after adding these records, its partition queue's buffered size
> has
> > > been
> > >  // increased beyond the threshold, we can then pause the
> > > consumption for this partition
> > >  if (newQueueSize > maxBufferedSize) {
> > >  mainConsumer.pause(singleton(partition));
> > >  }
> > >
> > > Is there any spe

Re: [EXTERNAL] Re: Security vulnerabilities in kafka:2.13-2.6.0/2.7.0 docker image

2021-11-01 Thread Colin McCabe
It seems like your image does not show up on the mailing list.

best,
Colin

On Wed, Sep 1, 2021, at 06:26, Ashish Patil wrote:
> Hi Team
>  
> I tried upgrading it to 2.13_2.8.0 but still have these vulnerabilities.
>  
> 
>  
> What is your suggestion on this?
>  
> Thanks
> Ashish
>  
> *From:* Jake Murphy Smith  
> *Sent:* 01 September 2021 09:31
> *To:* Ashish Patil 
> *Subject:* RE: [EXTERNAL] Re: Security vulnerabilities in 
> kafka:2.13-2.6.0/2.7.0 docker image
>  
>  
>  
> *From:* Luke Chen  
> *Sent:* 01 September 2021 04:11
> *To:* Kafka Users 
> *Cc:* d...@kafka.apache.org; Jake Murphy Smith 
> *Subject:* [EXTERNAL] Re: Security vulnerabilities in kafka:2.13-2.6.0/2.7.0 
> docker image
>  
> *ATTENTION:* This email originated from outside of GM.
> 
>  
> 
> Hi Ashish,
> I suggested that you upgrade to V2.8.
> I checked 2 of the CVEs, and are fixed (or not used, like libfetch) in V2.8.
> If you still found the CVEs existed in V2.8, please raise it.
>  
> Thank you.
> Luke
>  
>  
>  
>  
> On Wed, Sep 1, 2021 at 4:07 AM Ashish Patil  wrote:
>> Hi Team
>> 
>> I wanted to use the 2.6.0 docker image for Kafka but It has lots of security 
>> vulnerabilities.
>> Please find the below list of security vulnerabilities
>> **
>> CVE-2021-36159
>> CVE-2020-25649 
>> CVE-2021-22926
>> CVE-2021-22922
>> CVE-2021-22924
>> CVE-2021-22922
>> CVE-2021-22924
>> CVE-2021-31535
>> CVE-2019-17571 
>> **
>> 
>> I did raise this issue here 
>> https://github.com/wurstmeister/kafka-docker/issues/681 but it looks like 
>> the issue is within the Kafka binary.
>> 
>>  
>> 
>> Do we have any plan to fix this in the coming version or any suggestions 
>> around this?
>> 
>> Thanks
>> Ashish