guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609989025



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and 
right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time 
-> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and 
right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, 
inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering

Review comment:
       The trickiness as demonstrated in the current PR though, is that if we 
first do the expiration we may get records that are matched to the current 
processing record, which need to be skipped from deleting/emitting before the 
join.
   
   I think it is still possible to simply the current logic without naive 
buffering. Because:
   
   1) The current processing record's timestamp T is no larger than the updated 
max stream time T';
   2) The current processing record's matching record's smallest timestamp 
would be (T - window-size);
   3) The expired records' largest timestamp would be (T' - window-size - 
grace-period), where grace-period >= 0.
   
   In other words, if the current processing's record timestamp T is smaller 
than T' (i.e. it's a late record and hence did not advance the stream time), 
then for all records that are within [T - window-size, T' - window-size - 
grace-period] assuming T - window-size < T' - window-size - grace-period, would 
have already been expired end emitted, and hence won't be found and matched; if 
the current processing's record timestamp T == T' (i.e. it is not a late 
record), then T - window-size is always >= T' - window-size - grace-period, 
which means that all joined record's timestamps should be later than the 
expired timestamps. 
   
   That means, if we do the expiration first based on (T' - window-size - 
grace-period), the newly expired records' timestamps should all be smaller than 
any joined record's timestamps for that processing record generated later. And 
hence it is safe to just blindly expire them all without the `except` logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to