Hello Chad,

>From your earlier comment, you mentioned "In my scenario the records were
written to the KTable topic before the record was written to the KStream
topic." So I think Matthias and others have excluded this possibility while
trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then you would
miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <chad.preis...@gmail.com>
wrote:

> Thank you for your response and the links to the presentations.
>
>
> *However, this seems tobe orthogonal to your issue?*
>
> Yes. From what I see in the code it looks like you have a single consumer
> subscribed to multiple topics. Please correct me if I'm wrong.
>
>
> *By default, timestamp synchronization is disabled. Maybeenabling it would
> help?*
>
> We are using a timestamp extractor that returns 0. We did that because we
> were almost always missing joins on startup, and this seemed to be the only
> way to bootstrap enough records at startup to avoid the missed join. We
> found a post that said doing that would make the KTable act like the
> GlobalKTable at startup. So far this works great, we never miss a join on a
> startup. If I use "timestamp synchronization" do I have to remove the zero
> timestamp extractor? If I remove the zero timestamp extractor will
> timestamp synchronization take care of the missed join issue on startup?
>
> I'm guessing the issue here is that occasionally the poll request is not
> returning the matching record for the KTable side of the join before the
> task goes off and starts processing records. Later when we put the same
> record on the topic and the KTable has had a chance to load more records
> the join works and everything is good to go. Because of the way our system
> works no new status records have been written and so the new record joins
> against the correct status.
>
> Do you agree that the poll request is returning the KStream record but not
> returning the KTable record and therefore the join is getting missed? If
> you don't agree, what do you think is going on? Is there a way to prove
> this out?
>
> Thanks,
> Chad
>
> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Yes, a StreamThread has one consumer. The number of StreamThreads per
> > instance is configurable via `num.stream.threads`. Partitions are
> > assigned to threads similar to consumer is a plain consumer group.
> >
> > It seems you run with the default of one thread per instance. As you
> > spin up 12 instances, it results in 12 threads for the application. As
> > you have 12 partitions, using more threads won't be useful as no
> > partitions are left for them to process.
> >
> > For a stream-table joins, there will be one task per "partition pair"
> > that computes the join for those partitions. So you get 12 tasks, and
> > each thread processes one task in your setup. Ie, a thread consumer is
> > reading data for both input topics.
> >
> > Pausing happens on a per-partition bases: for joins there is two buffers
> > per task (one for each input topic partition). It's possible that one
> > partition is paused while the other is processed. However, this seems to
> > be orthogonal to your issue?
> >
> > For a GlobalKTable, you get an additional GlobalThread that only reads
> > the data from the "global topic" to update the GlobalKTable. Semantics
> > of KStream-KTable and KStream-GlobalKTable joins are different: Cf
> >
> >
> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
> >
> > For the timestamp synchronization, you may checkout `max.task.idle.ms`
> > config. By default, timestamp synchronization is disabled. Maybe
> > enabling it would help?
> >
> > You may also check out slides 34-38:
> >
> >
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
> >
> > There is one corner case: if two records with the same timestamp come
> > it, it's not defined which one will be processed first.
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> >
> > On 10/30/21 6:45 AM, Chad Preisler wrote:
> > > Yes, this helped. I have some additional questions.
> > >
> > > Does StreamThread have one consumer? (Looks like it, but just want to
> > > confirm)
> > > Is there a separate StreamThread for each topic including the KTable?
> > > If a KTable is a StreamThread and there is a  StreamTask for that
> KTable,
> > > could my buffer be getting filled up, and the mainConsumer for the
> KTable
> > > be getting paused? I see this code in StreamTask#addRecords.
> > >
> > > // if after adding these records, its partition queue's buffered size
> has
> > > been
> > >          // increased beyond the threshold, we can then pause the
> > > consumption for this partition
> > >          if (newQueueSize > maxBufferedSize) {
> > >              mainConsumer.pause(singleton(partition));
> > >          }
> > >
> > > Is there any specific logging that I can set to debug or trace that
> would
> > > help me troubleshoot? I'd prefer not to turn debug and/or trace on for
> > > every single class.
> > >
> > > Thanks,
> > > Chad
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> wrote:
> > >
> > >> Hi Chad,
> > >>> I'm wondering if someone can point me to the Kafka streams internal
> > code
> > >> that reads records for the join?
> > >> --> You can check StreamThread#pollPhase, where stream thread (main
> > >> consumer) periodically poll records. And then, it'll process each
> > topology
> > >> node with these polled records in stream tasks (StreamTask#process).
> > >>
> > >> Hope that helps.
> > >>
> > >> Thanks.
> > >> Luke
> > >>
> > >>
> > >> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
> > >> <gilles.philipp...@fundingcircle.com.invalid> wrote:
> > >>
> > >>> Hi Chad, this talk around 24:00 clearly explains what you’re seeing
> > >>>
> > >>
> >
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> > >>> <
> > >>>
> > >>
> >
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> > >>>>
> > >>>
> > >>> Gilles
> > >>>
> > >>>> On 30 Oct 2021, at 04:02, Chad Preisler <chad.preis...@gmail.com>
> > >> wrote:
> > >>>>
> > >>>> Hello,
> > >>>>
> > >>>> I have a stream application that does a KStream to KTable left join.
> > We
> > >>>> seem to be occasionally missing joins (KTable side is null).
> > >>>>
> > >>>> I'm wondering if someone can point me to the Kafka streams internal
> > >> code
> > >>>> that reads records for the join? I've poked around the Kafka code
> > base,
> > >>> but
> > >>>> there is a lot there. I imagine there is some consumer poll for each
> > >> side
> > >>>> of the join, and possibly a background thread for reading the KTable
> > >>> topic.
> > >>>>
> > >>>> I figure there are several possible causes of this issue, and since
> > >>> nothing
> > >>>> is really jumping out in my code, I was going to start looking at
> the
> > >>> Kafka
> > >>>> code to see if there is something I can do to fix this.
> > >>>>
> > >>>> Thanks,
> > >>>> Chad
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> Funding Circle Limited is authorised and regulated by the Financial
> > >>> Conduct Authority under firm registration number 722513. Funding
> Circle
> > >> is
> > >>> not covered by the Financial Services Compensation Scheme. Registered
> > in
> > >>> England (Co. No. 06968588) with registered office at 71 Queen
> Victoria
> > >>> Street, London EC4V 4AY.
> > >>>
> > >>
> > >
> >
>


-- 
-- Guozhang

Reply via email to