lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1397715022


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -104,20 +103,16 @@ public void init(final ProcessorContext<K, VOut> context) 
{
             internalProcessorContext = (InternalProcessorContext<K, VOut>) 
context;
 
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-            droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+            droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(),
+                    metrics);

Review Comment:
   ditto



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##########
@@ -609,17 +842,18 @@ public void testOrdering() {
             inputTopic1.pipeInput(1, "A1", 100L);
             processor.checkAndClearProcessResult();
 
-            // push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-            // the joined records
+            // push one item to the other window that has a join; 
+            // this should produce the joined record first;
+            // then non-joined record with a closed window
             // by the time they were produced before
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // w2 = { }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // --> w2 = { 1:a1 (ts: 110) }
             inputTopic2.pipeInput(1, "a1", 110L);
             processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(0, "A0+null", 0L),
-                new KeyValueTimestamp<>(1, "A1+a1", 110L)
+                new KeyValueTimestamp<>(1, "A1+a1", 110L),
+                new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   This is because previously we look at outer store and then join. But this 
change make it we join first and then look at outer store. The ts in outer 
store and other store is hard to reason. If we change the ts of 0 to be 100 and 
ts of 1 to be 50, the original test would still produce 0 first which has 
larger ts... So unless we compare the ts of join and outer at the same time 
when we output, we can guarantee the order of ts when output.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -60,24 +61,21 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
 
     private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 
-    KStreamKStreamJoin(final boolean isLeftSide,
-                       final String otherWindowName,
-                       final JoinWindowsInternal windows,
-                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends VOut> joiner,
-                       final boolean outer,
-                       final Optional<String> outerJoinWindowName,
-                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+    KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, 
final JoinWindowsInternal windows,
+            final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner, final boolean outer,
+            final Optional<String> outerJoinWindowName, final 
TimeTrackerSupplier sharedTimeTrackerSupplier) {

Review Comment:
   Change this back? I think for Kafka streams, the convention is to align with 
first param's indentation



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -165,40 +155,52 @@ public void process(final Record<K, V1> record) {
                     // problem:
                     //
                     // Say we have a window size of 5 seconds
-                    //  1. A non-joined record with time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
-                    //     The record is not processed yet, and is added to 
the outer-join store
-                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
-                    //     The record is not processed yet, and is added to 
the outer-join store
-                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
-                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
-                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    // 1. A non-joined record with time T10 is seen in the 
left-topic
+                    // (maxLeftStreamTime: 10)
+                    // The record is not processed yet, and is added to the 
outer-join store
+                    // 2. A non-joined record with time T2 is seen in the 
right-topic
+                    // (maxRightStreamTime: 2)
+                    // The record is not processed yet, and is added to the 
outer-join store
+                    // 3. A joined record with time T11 is seen in the 
left-topic
+                    // (maxLeftStreamTime: 11)
+                    // It is time to look at the expired records. T10 and T2 
should be emitted, but
+                    // because T2 was late, then it is not fetched by the 
window store, so it is not
+                    // processed
                     //
                     // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
                     //
-                    // This condition below allows us to process the 
out-of-order records without the need
+                    // This condition below allows us to process the 
out-of-order records without
+                    // the need
                     // to hold it in the temporary outer store
                     if (!outerJoinStore.isPresent() || timeTo < 
sharedTimeTracker.streamTime) {
                         
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
                     } else {
                         sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
                         outerJoinStore.ifPresent(store -> store.put(
-                            TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-                            LeftOrRightValue.make(isLeftSide, 
record.value())));
+                                TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
+                                LeftOrRightValue.make(isLeftSide, 
record.value())));

Review Comment:
   ditto



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -104,20 +103,16 @@ public void init(final ProcessorContext<K, VOut> context) 
{
             internalProcessorContext = (InternalProcessorContext<K, VOut>) 
context;
 
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-            droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+            droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(),
+                    metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
             sharedTimeTracker = 
sharedTimeTrackerSupplier.get(context.taskId());
 
             if (enableSpuriousResultFix) {
                 outerJoinStore = 
outerJoinWindowName.map(context::getStateStore);
 
-                sharedTimeTracker.setEmitInterval(
-                    StreamsConfig.InternalConfig.getLong(
-                        context.appConfigs(),
-                        
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
-                        1000L
-                    )
-                );
+                
sharedTimeTracker.setEmitInterval(StreamsConfig.InternalConfig.getLong(context.appConfigs(),
+                        
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 1000L));

Review Comment:
   ditto



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -134,29 +129,24 @@ public void process(final Record<K, V1> record) {
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
-
-            // Emit all non-joined records which window has closed
-            if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
-                outerJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, record));
-            }
             try (final WindowStoreIterator<V2> iter = 
otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
 
-                    outerJoinStore.ifPresent(store -> {
-                        // use putIfAbsent to first read and see if there's 
any values for the key,
-                        // if yes delete the key, otherwise do not issue a put;
-                        // we may delete some values with the same key early 
but since we are going
-                        // range over all values of the same key even after 
failure, since the other window-store
-                        // is only cleaned up by stream time, so this is okay 
for at-least-once.
-                        
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), 
otherRecordTimestamp), null);
-                    });
-
-                    context().forward(
-                        record.withValue(joiner.apply(record.key(), 
record.value(), otherRecord.value))
-                               .withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                    outerJoinStore.ifPresent(store ->
+                    // Use putIfAbsent to first read and see if there's any 
values for the key,
+                    // if yes delete the key, otherwise do not issue a put;
+                    // we may delete some values with the same key early but 
since we are going
+                    // range over all values of the same key even after 
failure, since the other
+                    // window-store
+                    // is only cleaned up by stream time, so this is okay for 
at-least-once.
+                        
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), 
otherRecordTimestamp),
+                            null));
+
+                    
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
otherRecord.value))
+                            .withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));

Review Comment:
   ditto



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -94,7 +92,8 @@ public Processor<K, V1, K, VOut> get() {
     private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, 
V1, K, VOut> {
         private WindowStore<K, V2> otherWindowStore;
         private Sensor droppedRecordsSensor;
-        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
+        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinStore = Optional
+                .empty();

Review Comment:
   revert this change? Does it cause checkstyle problem previously?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to