We want to make further improvement to stream-table joins. It's just not easy and a larger scoped project.

-Matthias

On 11/18/21 12:09 PM, Chad Preisler wrote:
I'm wondering if the kafka architects have plans to redesign/enhance this
behavior. Having to guess the idle time isn't the most satisfying solution.
No matter what time I put in there it seems possible that I will miss a
join.

Respectfully,
Chad

On Fri, Nov 5, 2021 at 3:07 PM Matthias J. Sax <mj...@apache.org> wrote:

The log clearly indicates that you hit enforced processing. We record
the metric and log:

Cf

https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L194-L200

Not sure why the metric does not report it...

Hence, the solution would be to increase `max.task.idle.ms` further to
give Kafka Streams time to fetch the data.

If might help to use DEBUG log to see for which partitions the consumer
sends fetch requests and which partitions return data, to better
understand the underlying behavior.


-Matthias

On 11/5/21 6:58 AM, Chad Preisler wrote:
It seems like I have 2 options to work around this issue.


     - Keep the KTable and have another process running that puts the
missed
     join message back on the event topic.
     - Switch to GlobalKTable.

Any other solutions/workarounds are welcome.

Thanks,
Chad

On Thu, Nov 4, 2021 at 11:43 AM Chad Preisler <chad.preis...@gmail.com>
wrote:

enforced-processing-total is zero for all missed join occurrences. I
logged all the metrics out at the time my stream processed the missed
join,
so let me know if there are any other metics that would help.

On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler <chad.preis...@gmail.com>
wrote:

I'm not sure. When I ran with trace logging turned on I saw a bunch of
messages like the ones below. Do those messages indicate
"enforced-processing"? It gets logged right after the call
to enforcedProcessingSensor.record.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result.
Partitions
with local data: [status-5]. Partitions we gave up waiting for, with
their
corresponding deadlines: {event-5=1635881287722}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881287750.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result.
Partitions
with local data: [event-5]. Partitions we gave up waiting for, with
their
corresponding deadlines: {status-5=1635881272754}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881277998.

On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax <mj...@apache.org>
wrote:

Can you check if the program ever does "enforced processing", ie,
`max.task.idle.ms` passed, and we process despite an empty input
buffer.

Cf
https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

As long as there is input data, we should never do "enforced
processing"
and the metric should stay at zero.


-Matthias

On 11/3/21 2:41 PM, Chad Preisler wrote:
Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds)
had no
effect on this issue.

On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <
chad.preis...@gmail.com>
wrote:

No unfortunately it is not the case. The table record is written
about 20
seconds before the stream record. I’ll crank up the time tomorrow
and
see
what happens.

On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org>
wrote:

Hard to tell, but as it seems that you can reproduce the issue, it
might
be worth a try to increase the idle time further.

I guess one corner case for stream-table join that is not resolved
yet
is when stream and table record have the same timestamp... For this
case, the table record might not be processed first.

Could you hit this case?


-Matthias

On 11/2/21 3:13 PM, Chad Preisler wrote:
Thank you for the information. We are using the Kafka 3.0 client
library.
We are able to reliably reproduce this issue in our test
environment
now. I
removed my timestamp extractor, and I set the max.task.idle.ms to
2000. I
also turned on trace logging for package
org.apache.kafka.streams.processor.internals.

To create the issue we stopped the application and ran enough data
to
create a lag of 400 messages. We saw 5 missed joins.

    From the stream-thread log messages we saw the event message,
our
stream
missed the join, and then several milliseconds later we saw the
stream-thread print out the status message. The stream-thread
printed
out
our status message a total of 5 times.

Given that only a few milliseconds passed between missing the join
and
the
stream-thread printing the status message, would increasing the
max.task.idle.ms help?

Thanks,
Chad

On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org

wrote:

Timestamp synchronization is not perfect, and as a matter of
fact,
we
fixed a few gaps in 3.0.0 release. We actually hope, that we
closed the
last gaps in 3.0.0... *fingers-crossed* :)

We are using a timestamp extractor that returns 0.

You can do this, and it effectively "disables" timestamp
synchronization
as records on the KTable side don't have a timeline any longer.
As
a
side effect it also allows you to "bootstrap" the table, as
records
with
timestamp zero will always be processed first (as they are
smaller). Of
course, you also don't have time synchronization for "future"
data
and
your program becomes non-deterministic if you reprocess old data.

his seemed to be the only
way to bootstrap enough records at startup to avoid the missed
join.

Using 3.0.0 and enabling timestamp synchronization via
`max.task.idle.ms` config, should allow you to get the correct
behavior
without the zero-extractor (of course, your KTable data must have
smaller timestamps that your KStream data).

If I use "timestamp synchronization" do I have to remove the
zero
timestamp extractor? If I remove the zero timestamp extractor
will
timestamp synchronization take care of the missed join issue on
startup?

To be more precise: timestamp synchronization is _always_ on. The
question is just how strict it is applied. By default, we do the
weakest
from which is only best effort.

I'm guessing the issue here is that occasionally the poll
request
is
not
returning the matching record for the KTable side of the join
before
the
task goes off and starts processing records.

Yes, because of default best effort approach. That is why you
should
increase `max.task.idle.ms` to detect this case and "skip"
processing
and let KS do another poll() to get KTable data.

2.8 and earlier:

max.task.idle.ms=0 -> best effort (no poll() retry)
max.task.idle.ms>0 -> try to do another poll() until data is
there or
idle time passed

Note: >0 might still "fail" even if there is data, because
consumer
fetch behavior is not predictable.


3.0:

max.task.idle.ms=-1 -> best effort (no poll() retry)
max.task.idle.ms=0 -> if there is data broker side, repeat to
poll()
until you get the data
max.task.idle.ms>0 -> even if there is not data broker side,
wait
until
data becomes available or the idle time passed


Hope this helps.


-Matthias

On 11/1/21 4:29 PM, Guozhang Wang wrote:
Hello Chad,

     From your earlier comment, you mentioned "In my scenario the
records
were
written to the KTable topic before the record was written to the
KStream
topic." So I think Matthias and others have excluded this
possibility
while
trying to help investigate.

If only the matching records from KStream are returned via a
single a
consumer poll call but not the other records from KTable, then
you
would
miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <
chad.preis...@gmail.com>
wrote:

Thank you for your response and the links to the presentations.


*However, this seems tobe orthogonal to your issue?*

Yes. From what I see in the code it looks like you have a
single
consumer
subscribed to multiple topics. Please correct me if I'm wrong.


*By default, timestamp synchronization is disabled.
Maybeenabling it
would
help?*

We are using a timestamp extractor that returns 0. We did that
because
we
were almost always missing joins on startup, and this seemed to
be
the
only
way to bootstrap enough records at startup to avoid the missed
join.
We
found a post that said doing that would make the KTable act
like
the
GlobalKTable at startup. So far this works great, we never
miss a
join
on a
startup. If I use "timestamp synchronization" do I have to
remove the
zero
timestamp extractor? If I remove the zero timestamp extractor
will
timestamp synchronization take care of the missed join issue on
startup?

I'm guessing the issue here is that occasionally the poll
request is
not
returning the matching record for the KTable side of the join
before
the
task goes off and starts processing records. Later when we put
the
same
record on the topic and the KTable has had a chance to load
more
records
the join works and everything is good to go. Because of the way
our
system
works no new status records have been written and so the new
record
joins
against the correct status.

Do you agree that the poll request is returning the KStream
record
but
not
returning the KTable record and therefore the join is getting
missed? If
you don't agree, what do you think is going on? Is there a way
to
prove
this out?

Thanks,
Chad

On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <
mj...@apache.org>
wrote:

Yes, a StreamThread has one consumer. The number of
StreamThreads
per
instance is configurable via `num.stream.threads`. Partitions
are
assigned to threads similar to consumer is a plain consumer
group.

It seems you run with the default of one thread per instance.
As you
spin up 12 instances, it results in 12 threads for the
application.
As
you have 12 partitions, using more threads won't be useful as
no
partitions are left for them to process.

For a stream-table joins, there will be one task per
"partition
pair"
that computes the join for those partitions. So you get 12
tasks,
and
each thread processes one task in your setup. Ie, a thread
consumer
is
reading data for both input topics.

Pausing happens on a per-partition bases: for joins there is
two
buffers
per task (one for each input topic partition). It's possible
that
one
partition is paused while the other is processed. However,
this
seems
to
be orthogonal to your issue?

For a GlobalKTable, you get an additional GlobalThread that
only
reads
the data from the "global topic" to update the GlobalKTable.
Semantics
of KStream-KTable and KStream-GlobalKTable joins are
different:
Cf






https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

For the timestamp synchronization, you may checkout `
max.task.idle.ms`
config. By default, timestamp synchronization is disabled.
Maybe
enabling it would help?

You may also check out slides 34-38:






https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/

There is one corner case: if two records with the same
timestamp
come
it, it's not defined which one will be processed first.

Hope this helps.


-Matthias


On 10/30/21 6:45 AM, Chad Preisler wrote:
Yes, this helped. I have some additional questions.

Does StreamThread have one consumer? (Looks like it, but just
want
to
confirm)
Is there a separate StreamThread for each topic including the
KTable?
If a KTable is a StreamThread and there is a  StreamTask for
that
KTable,
could my buffer be getting filled up, and the mainConsumer
for
the
KTable
be getting paused? I see this code in StreamTask#addRecords.

// if after adding these records, its partition queue's
buffered
size
has
been
              // increased beyond the threshold, we can then
pause the
consumption for this partition
              if (newQueueSize > maxBufferedSize) {
                  mainConsumer.pause(singleton(partition));
              }

Is there any specific logging that I can set to debug or
trace
that
would
help me troubleshoot? I'd prefer not to turn debug and/or
trace on
for
every single class.

Thanks,
Chad





On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com

wrote:

Hi Chad,
I'm wondering if someone can point me to the Kafka streams
internal
code
that reads records for the join?
--> You can check StreamThread#pollPhase, where stream
thread
(main
consumer) periodically poll records. And then, it'll process
each
topology
node with these polled records in stream tasks
(StreamTask#process).

Hope that helps.

Thanks.
Luke


On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
<gilles.philipp...@fundingcircle.com.invalid> wrote:

Hi Chad, this talk around 24:00 clearly explains what
you’re
seeing







https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
<







https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/


Gilles

On 30 Oct 2021, at 04:02, Chad Preisler <
chad.preis...@gmail.com>
wrote:

Hello,

I have a stream application that does a KStream to KTable
left
join.
We
seem to be occasionally missing joins
<

https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g

(KTable side is null).

I'm wondering if someone can point me to the Kafka streams
internal
code
that reads records for the join? I've poked around the
Kafka
code
base,
but
there is a lot there. I imagine there is some consumer
poll
for
each
side
of the join, and possibly a background thread for reading
the
KTable
topic.

I figure there are several possible causes of this issue,
and
since
nothing
is really jumping out in my code, I was going to start
looking
at
the
Kafka
code to see if there is something I can do to fix this.

Thanks,
Chad


--




Funding Circle Limited is authorised and regulated by the
Financial
Conduct Authority under firm registration number 722513.
Funding
Circle
is
not covered by the Financial Services Compensation Scheme.
Registered
in
England (Co. No. 06968588) with registered office at 71
Queen
Victoria
Street, London EC4V 4AY.

















Reply via email to