Yes, a StreamThread has one consumer. The number of StreamThreads per instance is configurable via `num.stream.threads`. Partitions are assigned to threads similar to consumer is a plain consumer group.

It seems you run with the default of one thread per instance. As you spin up 12 instances, it results in 12 threads for the application. As you have 12 partitions, using more threads won't be useful as no partitions are left for them to process.

For a stream-table joins, there will be one task per "partition pair" that computes the join for those partitions. So you get 12 tasks, and each thread processes one task in your setup. Ie, a thread consumer is reading data for both input topics.

Pausing happens on a per-partition bases: for joins there is two buffers per task (one for each input topic partition). It's possible that one partition is paused while the other is processed. However, this seems to be orthogonal to your issue?

For a GlobalKTable, you get an additional GlobalThread that only reads the data from the "global topic" to update the GlobalKTable. Semantics of KStream-KTable and KStream-GlobalKTable joins are different: Cf https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

For the timestamp synchronization, you may checkout `max.task.idle.ms` config. By default, timestamp synchronization is disabled. Maybe enabling it would help?

You may also check out slides 34-38: https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/

There is one corner case: if two records with the same timestamp come it, it's not defined which one will be processed first.

Hope this helps.


-Matthias


On 10/30/21 6:45 AM, Chad Preisler wrote:
Yes, this helped. I have some additional questions.

Does StreamThread have one consumer? (Looks like it, but just want to
confirm)
Is there a separate StreamThread for each topic including the KTable?
If a KTable is a StreamThread and there is a  StreamTask for that KTable,
could my buffer be getting filled up, and the mainConsumer for the KTable
be getting paused? I see this code in StreamTask#addRecords.

// if after adding these records, its partition queue's buffered size has
been
         // increased beyond the threshold, we can then pause the
consumption for this partition
         if (newQueueSize > maxBufferedSize) {
             mainConsumer.pause(singleton(partition));
         }

Is there any specific logging that I can set to debug or trace that would
help me troubleshoot? I'd prefer not to turn debug and/or trace on for
every single class.

Thanks,
Chad





On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> wrote:

Hi Chad,
I'm wondering if someone can point me to the Kafka streams internal code
that reads records for the join?
--> You can check StreamThread#pollPhase, where stream thread (main
consumer) periodically poll records. And then, it'll process each topology
node with these polled records in stream tasks (StreamTask#process).

Hope that helps.

Thanks.
Luke


On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
<gilles.philipp...@fundingcircle.com.invalid> wrote:

Hi Chad, this talk around 24:00 clearly explains what you’re seeing

https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
<

https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/


Gilles

On 30 Oct 2021, at 04:02, Chad Preisler <chad.preis...@gmail.com>
wrote:

Hello,

I have a stream application that does a KStream to KTable left join. We
seem to be occasionally missing joins (KTable side is null).

I'm wondering if someone can point me to the Kafka streams internal
code
that reads records for the join? I've poked around the Kafka code base,
but
there is a lot there. I imagine there is some consumer poll for each
side
of the join, and possibly a background thread for reading the KTable
topic.

I figure there are several possible causes of this issue, and since
nothing
is really jumping out in my code, I was going to start looking at the
Kafka
code to see if there is something I can do to fix this.

Thanks,
Chad


--




Funding Circle Limited is authorised and regulated by the Financial
Conduct Authority under firm registration number 722513. Funding Circle
is
not covered by the Financial Services Compensation Scheme. Registered in
England (Co. No. 06968588) with registered office at 71 Queen Victoria
Street, London EC4V 4AY.



Reply via email to