guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609989025
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
return;
}
- // maxObservedStreamTime is updated and shared between left and
right sides, so we can
- // process a non-join record immediately if it is late
- final long maxStreamTime = maxObservedStreamTime.updateAndGet(time
-> Math.max(time, context().timestamp()));
-
boolean needOuterJoin = outer;
+ boolean joinFound = false;
final long inputRecordTimestamp = context().timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp -
joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp +
joinAfterMs);
+ // maxObservedStreamTime is updated and shared between left and
right join sides
+ maxObservedStreamTime.updateAndGet(t -> Math.max(t,
inputRecordTimestamp));
+
try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key,
timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
+ joinFound = true;
final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;
+
+ // Emit expired records before the joined record to keep
time ordering
Review comment:
The trickiness as demonstrated in the current PR though, is that if we
first do the expiration we may get records that are matched to the current
processing record, which need to be skipped from deleting/emitting before the
join.
I think it is still possible to simply the current logic without naive
buffering. Because:
1) The current processing record's timestamp T is no larger than the updated
max stream time T';
2) The current processing record's matching record's smallest timestamp
would be (T - window-size);
3) The expired records' largest timestamp would be (T' - window-size -
grace-period), where grace-period >= 0.
In other words, if the current processing's record timestamp T is smaller
than T' (i.e. it's a late record and hence did not advance the stream time),
then for all records that are within [T - window-size, T' - window-size -
grace-period] assuming T - window-size < T' - window-size - grace-period, would
have already been expired end emitted, and hence won't be found and matched; if
the current processing's record timestamp T == T' (i.e. it is not a late
record), then T - window-size is always >= T' - window-size - grace-period,
which means that all joined record's timestamps should be later than the
expired timestamps.
That means, if we do the expiration first based on (T' - window-size -
grace-period), the newly expired records' timestamps should all be smaller than
any joined record's timestamps for that processing record generated later. And
hence it is safe to just blindly expire them all without the `except` logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]