Hello Chad, >From your earlier comment, you mentioned "In my scenario the records were written to the KTable topic before the record was written to the KStream topic." So I think Matthias and others have excluded this possibility while trying to help investigate.
If only the matching records from KStream are returned via a single a consumer poll call but not the other records from KTable, then you would miss this matched join result. Guozhang On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <chad.preis...@gmail.com> wrote: > Thank you for your response and the links to the presentations. > > > *However, this seems tobe orthogonal to your issue?* > > Yes. From what I see in the code it looks like you have a single consumer > subscribed to multiple topics. Please correct me if I'm wrong. > > > *By default, timestamp synchronization is disabled. Maybeenabling it would > help?* > > We are using a timestamp extractor that returns 0. We did that because we > were almost always missing joins on startup, and this seemed to be the only > way to bootstrap enough records at startup to avoid the missed join. We > found a post that said doing that would make the KTable act like the > GlobalKTable at startup. So far this works great, we never miss a join on a > startup. If I use "timestamp synchronization" do I have to remove the zero > timestamp extractor? If I remove the zero timestamp extractor will > timestamp synchronization take care of the missed join issue on startup? > > I'm guessing the issue here is that occasionally the poll request is not > returning the matching record for the KTable side of the join before the > task goes off and starts processing records. Later when we put the same > record on the topic and the KTable has had a chance to load more records > the join works and everything is good to go. Because of the way our system > works no new status records have been written and so the new record joins > against the correct status. > > Do you agree that the poll request is returning the KStream record but not > returning the KTable record and therefore the join is getting missed? If > you don't agree, what do you think is going on? Is there a way to prove > this out? > > Thanks, > Chad > > On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org> wrote: > > > Yes, a StreamThread has one consumer. The number of StreamThreads per > > instance is configurable via `num.stream.threads`. Partitions are > > assigned to threads similar to consumer is a plain consumer group. > > > > It seems you run with the default of one thread per instance. As you > > spin up 12 instances, it results in 12 threads for the application. As > > you have 12 partitions, using more threads won't be useful as no > > partitions are left for them to process. > > > > For a stream-table joins, there will be one task per "partition pair" > > that computes the join for those partitions. So you get 12 tasks, and > > each thread processes one task in your setup. Ie, a thread consumer is > > reading data for both input topics. > > > > Pausing happens on a per-partition bases: for joins there is two buffers > > per task (one for each input topic partition). It's possible that one > > partition is paused while the other is processed. However, this seems to > > be orthogonal to your issue? > > > > For a GlobalKTable, you get an additional GlobalThread that only reads > > the data from the "global topic" to update the GlobalKTable. Semantics > > of KStream-KTable and KStream-GlobalKTable joins are different: Cf > > > > > https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/ > > > > For the timestamp synchronization, you may checkout `max.task.idle.ms` > > config. By default, timestamp synchronization is disabled. Maybe > > enabling it would help? > > > > You may also check out slides 34-38: > > > > > https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ > > > > There is one corner case: if two records with the same timestamp come > > it, it's not defined which one will be processed first. > > > > Hope this helps. > > > > > > -Matthias > > > > > > On 10/30/21 6:45 AM, Chad Preisler wrote: > > > Yes, this helped. I have some additional questions. > > > > > > Does StreamThread have one consumer? (Looks like it, but just want to > > > confirm) > > > Is there a separate StreamThread for each topic including the KTable? > > > If a KTable is a StreamThread and there is a StreamTask for that > KTable, > > > could my buffer be getting filled up, and the mainConsumer for the > KTable > > > be getting paused? I see this code in StreamTask#addRecords. > > > > > > // if after adding these records, its partition queue's buffered size > has > > > been > > > // increased beyond the threshold, we can then pause the > > > consumption for this partition > > > if (newQueueSize > maxBufferedSize) { > > > mainConsumer.pause(singleton(partition)); > > > } > > > > > > Is there any specific logging that I can set to debug or trace that > would > > > help me troubleshoot? I'd prefer not to turn debug and/or trace on for > > > every single class. > > > > > > Thanks, > > > Chad > > > > > > > > > > > > > > > > > > On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> wrote: > > > > > >> Hi Chad, > > >>> I'm wondering if someone can point me to the Kafka streams internal > > code > > >> that reads records for the join? > > >> --> You can check StreamThread#pollPhase, where stream thread (main > > >> consumer) periodically poll records. And then, it'll process each > > topology > > >> node with these polled records in stream tasks (StreamTask#process). > > >> > > >> Hope that helps. > > >> > > >> Thanks. > > >> Luke > > >> > > >> > > >> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart > > >> <gilles.philipp...@fundingcircle.com.invalid> wrote: > > >> > > >>> Hi Chad, this talk around 24:00 clearly explains what you’re seeing > > >>> > > >> > > > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > > >>> < > > >>> > > >> > > > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > > >>>> > > >>> > > >>> Gilles > > >>> > > >>>> On 30 Oct 2021, at 04:02, Chad Preisler <chad.preis...@gmail.com> > > >> wrote: > > >>>> > > >>>> Hello, > > >>>> > > >>>> I have a stream application that does a KStream to KTable left join. > > We > > >>>> seem to be occasionally missing joins (KTable side is null). > > >>>> > > >>>> I'm wondering if someone can point me to the Kafka streams internal > > >> code > > >>>> that reads records for the join? I've poked around the Kafka code > > base, > > >>> but > > >>>> there is a lot there. I imagine there is some consumer poll for each > > >> side > > >>>> of the join, and possibly a background thread for reading the KTable > > >>> topic. > > >>>> > > >>>> I figure there are several possible causes of this issue, and since > > >>> nothing > > >>>> is really jumping out in my code, I was going to start looking at > the > > >>> Kafka > > >>>> code to see if there is something I can do to fix this. > > >>>> > > >>>> Thanks, > > >>>> Chad > > >>> > > >>> > > >>> -- > > >>> > > >>> > > >>> > > >>> > > >>> Funding Circle Limited is authorised and regulated by the Financial > > >>> Conduct Authority under firm registration number 722513. Funding > Circle > > >> is > > >>> not covered by the Financial Services Compensation Scheme. Registered > > in > > >>> England (Co. No. 06968588) with registered office at 71 Queen > Victoria > > >>> Street, London EC4V 4AY. > > >>> > > >> > > > > > > -- -- Guozhang