raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545788943


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Optional;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamRightJoin<K, V1, V2, VOut> extends KStreamKStreamJoin<K, 
V1, V2, VOut> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamRightJoin.class);
+
+
+    KStreamKStreamRightJoin(final String otherWindowName,
+            final JoinWindowsInternal windows,
+            final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner,
+            final boolean outer,
+            final Optional<String> outerJoinWindowName,
+            final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+        super(otherWindowName, windows, windows.afterMs, windows.beforeMs, 
joiner, outerJoinWindowName,
+                sharedTimeTrackerSupplier, outer);
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamRightJoinProcessor();
+    }
+
+    private class KStreamKStreamRightJoinProcessor extends 
KStreamKStreamJoinProcessor {
+        @Override
+        public void process(final Record<K, V1> rightRecord) {
+            final long inputRecordTimestamp = rightRecord.timestamp();
+            final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+            final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+            if (outer && rightRecord.key() == null && rightRecord.value() != 
null) {
+                
context().forward(rightRecord.withValue(joiner.apply(rightRecord.key(), 
rightRecord.value(), null)));
+                return;
+            } else if (StreamStreamJoinUtil.skipRecord(rightRecord, LOG, 
droppedRecordsSensor, context())) {
+                return;
+            }
+
+            // Emit all non-joined records which window has closed
+            if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
+                rightOuterJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, rightRecord));
+            }
+
+            boolean needOuterJoin = outer;
+            // TODO: Where and when are the records put in this window store?
+            try (final WindowStoreIterator<V2> iter = 
otherWindowStore.fetch(rightRecord.key(), timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    needOuterJoin = false;
+                    final KeyValue<Long, V2> leftRecord = iter.next();
+                    final long leftRecordTimestamp = leftRecord.key;
+
+                    rightOuterJoinStore.ifPresent(store -> {
+                        // use putIfAbsent to first read and see if there's 
any values for the key,
+                        // if yes delete the key, otherwise do not issue a put;
+                        // we may delete some values with the same key early 
but since we are going
+                        // range over all values of the same key even after 
failure, since the other window-store
+                        // is only cleaned up by stream time, so this is okay 
for at-least-once.
+                        store.putIfAbsent(
+                                
TimestampedKeyAndJoinSide.makeLeftSide(rightRecord.key(), leftRecordTimestamp), 
null);
+                    });
+
+                    final VOut joinResult = joiner.apply(rightRecord.key(), 
rightRecord.value(), leftRecord.value);
+                    context().forward(
+                            rightRecord.withValue(joinResult)
+                                    
.withTimestamp(Math.max(inputRecordTimestamp, leftRecordTimestamp)));
+                }
+
+                if (needOuterJoin) {
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record with time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // This condition below allows us to process the 
out-of-order records without the need
+                    // to hold it in the temporary outer store
+                    if (!rightOuterJoinStore.isPresent() || timeTo < 
sharedTimeTracker.streamTime) {
+                        
context().forward(rightRecord.withValue(joiner.apply(rightRecord.key(), 
rightRecord.value(), null)));
+                    } else {
+                        sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
+                        rightOuterJoinStore.ifPresent(store -> {
+                            final TimestampedKeyAndJoinSide<K> key =
+                                    
TimestampedKeyAndJoinSide.makeRightSide(rightRecord.key(), 
inputRecordTimestamp);
+                            final LeftOrRightValue<V2, V1> value = 
LeftOrRightValue.makeRightValue(rightRecord.value());
+                            store.put(key, value);
+                        });
+                    }
+                }
+            }
+        }
+
+        private void emitNonJoinedOuterRecords(
+                final KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V2, V1>> store,
+                final Record<K, V1> record) {
+
+            // calling `store.all()` creates an iterator what is an expensive 
operation on RocksDB;
+            // to reduce runtime cost, we try to avoid paying those cost
+
+            // only try to emit left/outer join results if there _might_ be 
any result records
+            if (sharedTimeTracker.minTime + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
+                return;
+            }
+            // throttle the emit frequency to a (configurable) interval;
+            // we use processing time to decouple from data properties,
+            // as throttling is a non-functional performance optimization
+            if (internalProcessorContext.currentSystemTimeMs() < 
sharedTimeTracker.nextTimeToEmit) {
+                return;
+            }
+
+            // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, 
if we dont set it everytime,
+            // they can get out of sync during a clock drift
+            sharedTimeTracker.nextTimeToEmit = 
internalProcessorContext.currentSystemTimeMs();
+            sharedTimeTracker.advanceNextTimeToEmit();
+
+            // reset to MAX_VALUE in case the store is empty
+            sharedTimeTracker.minTime = Long.MAX_VALUE;
+
+            try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V2, V1>> it = store.all()) {
+                TimestampedKeyAndJoinSide<K> prevKey = null;
+
+                boolean outerJoinLeftWindowOpen = false;
+                boolean outerJoinRightWindowOpen = false;
+                while (it.hasNext()) {
+                    if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+                        break; // there are no more candidates to emit on 
left-outerJoin-side and right-outerJoin-side
+                    }
+                    final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V2, V1>> next = it.next();
+                    final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = next.key;
+                    final LeftOrRightValue<V2, V1> value = next.value;
+                    final K key = timestampedKeyAndJoinSide.getKey();
+                    final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
+                    sharedTimeTracker.minTime = timestamp;
+
+                    // Skip next records if window has not closed
+                    // TODO: The timestampedKeyAndJoinSide contains values 
from left side.
+                    final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+                    if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs
+                            >= sharedTimeTracker.streamTime) {
+                        if (timestampedKeyAndJoinSide.getJoinSide() == 
JoinSide.LEFT) {

Review Comment:
   A side note: This decision should ideally be removed. The reason is still 
there is because I am initialising the left- and right outer stores with the 
same name. See the 'init()' method in the parent class.
   
   In the KStreamKStreamLeftJoin this call was not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to