Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are 
considers malformed and dropped? Are you using the same records in 
production and in your local test?




Are there any settings for the stream client that would affect the join?


Not that I can think of... There is one more internal config, but as 
long as data is flowing, it should not impact the result you see.




Are there any settings on the broker side that would affect the join?


No. The join is computed client side. Broker configs should not have any 
impact.



f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it 
does not do any logging except WARN for the already mentioned "dropping 
malformed" records that is also recorded via JMX.



WARN: "Skipping record due to null key or value. "



If you can identify a specific record from the input which would produce 
an output, but does not, maybe you can try to feed it into your local 
test env and try to re-produce the issue?



-Matthias

On 4/30/24 11:38 AM, Chad Preisler wrote:

Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and there
has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on our
actual servers. If I increase the log level for the streams API would that
shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax  wrote:


I expect the join to

execute after the 25 with one side of the join containing a record and

the

other being null


Given that you also have a grace period of 5 minutes, the result will
only be emitted after the grace-period passed and the window is closed
(not when window end time is reached).


One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but

I'm

not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be
emitted as left/right join result, ie, if there is no inner join result.
The format used is internal:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java

Also note: time is based on event-time, ie, if the input stream stops to
send new records, "stream-time" will stop to advance and the result
might not be emitted because the window does not get closed.

(Last, there is some internal wall-clock time delay of one second to
emit results for performance reasons...)

HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25

minutes

and 5 minutes of grace.  When I get a record for one side of the join,

but

don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and

the

other being null. Is that correct?  If it is correct, it's not working

for

me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see

when

the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot

this

issue?

Thanks,
Chad



Re: outerjoin not joining after window

2024-04-30 Thread Chad Preisler
Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and there
has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on our
actual servers. If I increase the log level for the streams API would that
shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax  wrote:

> > I expect the join to
> >> execute after the 25 with one side of the join containing a record and
> the
> >> other being null
>
> Given that you also have a grace period of 5 minutes, the result will
> only be emitted after the grace-period passed and the window is closed
> (not when window end time is reached).
>
> > One has a
> >> naming convention of "KSTREAM_OUTERSHARED". I see a record there, but
> I'm
> >> not sure how to decode that message to see what is in it. What is the
> >> purpose of those messages?
>
> It's an internal store, that stores all records which are subject to be
> emitted as left/right join result, ie, if there is no inner join result.
> The format used is internal:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java
>
> Also note: time is based on event-time, ie, if the input stream stops to
> send new records, "stream-time" will stop to advance and the result
> might not be emitted because the window does not get closed.
>
> (Last, there is some internal wall-clock time delay of one second to
> emit results for performance reasons...)
>
> HTH.
>
> -Matthias
>
> On 4/30/24 6:51 AM, Chad Preisler wrote:
> > Hello,
> >
> > I have a KStream to KStream outer join with a time difference of 25
> minutes
> > and 5 minutes of grace.  When I get a record for one side of the join,
> but
> > don't get a record on the other side of the join, I expect the join to
> > execute after the 25 with one side of the join containing a record and
> the
> > other being null. Is that correct?  If it is correct, it's not working
> for
> > me.
> >
> > I was poking around on the broker and saw some internal topics. I see the
> > key I expected to execute the join on some of those topics. One has a
> > naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
> > not sure how to decode that message to see what is in it. What is the
> > purpose of those messages? If I decode the message will it help me see
> when
> > the join should have been executed?
> >
> > I also see the key on a topic with the naming convention
> > "KSTREAM_OUTERTHIS".
> >
> > Are there any other topics that I should be looking at to troubleshoot
> this
> > issue?
> >
> > Thanks,
> > Chad
> >
>


Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

I expect the join to

execute after the 25 with one side of the join containing a record and the
other being null


Given that you also have a grace period of 5 minutes, the result will 
only be emitted after the grace-period passed and the window is closed 
(not when window end time is reached).



One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be 
emitted as left/right join result, ie, if there is no inner join result. 
The format used is internal: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java


Also note: time is based on event-time, ie, if the input stream stops to 
send new records, "stream-time" will stop to advance and the result 
might not be emitted because the window does not get closed.


(Last, there is some internal wall-clock time delay of one second to 
emit results for performance reasons...)


HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25 minutes
and 5 minutes of grace.  When I get a record for one side of the join, but
don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and the
other being null. Is that correct?  If it is correct, it's not working for
me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see when
the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot this
issue?

Thanks,
Chad



outerjoin not joining after window

2024-04-30 Thread Chad Preisler
Hello,

I have a KStream to KStream outer join with a time difference of 25 minutes
and 5 minutes of grace.  When I get a record for one side of the join, but
don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and the
other being null. Is that correct?  If it is correct, it's not working for
me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see when
the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot this
issue?

Thanks,
Chad