Re: Stream to KTable internals

2021-11-19 Thread Matthias J. Sax
We want to make further improvement to stream-table joins. It's just not easy and a larger scoped project. -Matthias On 11/18/21 12:09 PM, Chad Preisler wrote: I'm wondering if the kafka architects have plans to redesign/enhance this behavior. Having to guess the idle time isn't the most

Re: Stream to KTable internals

2021-11-18 Thread Chad Preisler
I'm wondering if the kafka architects have plans to redesign/enhance this behavior. Having to guess the idle time isn't the most satisfying solution. No matter what time I put in there it seems possible that I will miss a join. Respectfully, Chad On Fri, Nov 5, 2021 at 3:07 PM Matthias J. Sax

Re: Stream to KTable internals

2021-11-05 Thread Matthias J. Sax
The log clearly indicates that you hit enforced processing. We record the metric and log: Cf https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L194-L200 Not sure why the metric does not report it... Hence, the

Re: Stream to KTable internals

2021-11-05 Thread Chad Preisler
It seems like I have 2 options to work around this issue. - Keep the KTable and have another process running that puts the missed join message back on the event topic. - Switch to GlobalKTable. Any other solutions/workarounds are welcome. Thanks, Chad On Thu, Nov 4, 2021 at 11:43 AM

Re: Stream to KTable internals

2021-11-04 Thread Chad Preisler
enforced-processing-total is zero for all missed join occurrences. I logged all the metrics out at the time my stream processed the missed join, so let me know if there are any other metics that would help. On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler wrote: > I'm not sure. When I ran with

Re: Stream to KTable internals

2021-11-03 Thread Chad Preisler
I'm not sure. When I ran with trace logging turned on I saw a bunch of messages like the ones below. Do those messages indicate "enforced-processing"? It gets logged right after the call to enforcedProcessingSensor.record. Continuing to process although some partitions are empty on the broker.

Re: Stream to KTable internals

2021-11-03 Thread Matthias J. Sax
Can you check if the program ever does "enforced processing", ie, `max.task.idle.ms` passed, and we process despite an empty input buffer. Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring As long as there is input data, we should never do "enforced processing" and the

Re: Stream to KTable internals

2021-11-03 Thread Chad Preisler
Just a quick update. Setting max.task.idle.ms to 1 (10 seconds) had no effect on this issue. On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler wrote: > No unfortunately it is not the case. The table record is written about 20 > seconds before the stream record. I’ll crank up the time tomorrow

Re: Stream to KTable internals

2021-11-02 Thread Chad Preisler
No unfortunately it is not the case. The table record is written about 20 seconds before the stream record. I’ll crank up the time tomorrow and see what happens. On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax wrote: > Hard to tell, but as it seems that you can reproduce the issue, it might > be

Re: Stream to KTable internals

2021-11-02 Thread Matthias J. Sax
Hard to tell, but as it seems that you can reproduce the issue, it might be worth a try to increase the idle time further. I guess one corner case for stream-table join that is not resolved yet is when stream and table record have the same timestamp... For this case, the table record might

Re: Stream to KTable internals

2021-11-02 Thread Chad Preisler
Thank you for the information. We are using the Kafka 3.0 client library. We are able to reliably reproduce this issue in our test environment now. I removed my timestamp extractor, and I set the max.task.idle.ms to 2000. I also turned on trace logging for package

Re: Stream to KTable internals

2021-11-01 Thread Matthias J. Sax
Timestamp synchronization is not perfect, and as a matter of fact, we fixed a few gaps in 3.0.0 release. We actually hope, that we closed the last gaps in 3.0.0... *fingers-crossed* :) We are using a timestamp extractor that returns 0. You can do this, and it effectively "disables"

Re: Stream to KTable internals

2021-11-01 Thread Guozhang Wang
Hello Chad, >From your earlier comment, you mentioned "In my scenario the records were written to the KTable topic before the record was written to the KStream topic." So I think Matthias and others have excluded this possibility while trying to help investigate. If only the matching records

Re: Stream to KTable internals

2021-10-31 Thread Chad Preisler
Thank you for your response and the links to the presentations. *However, this seems tobe orthogonal to your issue?* Yes. From what I see in the code it looks like you have a single consumer subscribed to multiple topics. Please correct me if I'm wrong. *By default, timestamp synchronization

Re: Stream to KTable internals

2021-10-30 Thread Matthias J. Sax
Yes, a StreamThread has one consumer. The number of StreamThreads per instance is configurable via `num.stream.threads`. Partitions are assigned to threads similar to consumer is a plain consumer group. It seems you run with the default of one thread per instance. As you spin up 12 instances,

Re: Stream to KTable internals

2021-10-30 Thread Chad Preisler
Yes, this helped. I have some additional questions. Does StreamThread have one consumer? (Looks like it, but just want to confirm) Is there a separate StreamThread for each topic including the KTable? If a KTable is a StreamThread and there is a StreamTask for that KTable, could my buffer be

Re: Stream to KTable internals

2021-10-30 Thread Chad Preisler
Thank you for the information. This is actually not my issue. In my scenario the records were written to the KTable topic before the record was written to the KStream topic. We only get a handful of these issues per day, and they seem to happen when many transactions are being run through the

Re: Stream to KTable internals

2021-10-30 Thread Luke Chen
Hi Chad, > I'm wondering if someone can point me to the Kafka streams internal code that reads records for the join? --> You can check StreamThread#pollPhase, where stream thread (main consumer) periodically poll records. And then, it'll process each topology node with these polled records in

Re: Stream to KTable internals

2021-10-30 Thread Gilles Philippart
Hi Chad, this talk around 24:00 clearly explains what you’re seeing https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/

Stream to KTable internals

2021-10-29 Thread Chad Preisler
Hello, I have a stream application that does a KStream to KTable left join. We seem to be occasionally missing joins (KTable side is null). I'm wondering if someone can point me to the Kafka streams internal code that reads records for the join? I've poked around the Kafka code base, but there