[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-20 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616762075



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {

Review comment:
   I did not find how to get the configs from KStreamImplJoin. I tracked 
the configs being passed from the StreamTask down to the processor when 
processing the record, so I ended up checking the flag at that point. I could 
refactor the code to pass the configs when constructing the joins, but that 
require more changes in different places which I'm not sure if it will make 
things incompatible. Any ideas? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-20 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616759894



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKStreamOuterJoinTest {

Review comment:
   Most of them were already there. I just updated them to add the 3) 
scenario. 
   ```
   KStreamKStreamLeftJoinTest
   - testLeftJoinDuplicates()
   - testLeftJoinDuplicatesWithFixDisabled()
   - testOrdering()
   
   KStreamKStreamOuterJoinTest
   - testOuterJoinDuplicates()
   - testOuterJoinDuplicatesWithFixDisabled()
   - testOrdering()
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-20 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616758960



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +83,44 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private final Predicate>> 
recordWindowHasClosed =

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +83,44 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private final Predicate>> 
recordWindowHasClosed =
+windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
 private WindowStore otherWindow;
 private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
+private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
 metrics = (StreamsMetricsImpl) context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-otherWindow = (WindowStore) 
context.getStateStore(otherWindowName);
+otherWindow = context.getStateStore(otherWindowName);

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614313362



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+// the put() call will advance the stream time, which causes 
records out of the retention
+// period to be deleted, thus not being emitted later.
+if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+emitExpiredNonJoinedOuterRecords();
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(isLeftJoin, key),
+LeftOrRightValue.make(isLeftJoin, value),
+inputRecordTimestamp));
+}
+}
+}
+}
+
+private void emitExpiredNonJoinedOuterRecords() {
+outerJoinWindowStore.ifPresent(store ->
+emitExpiredNonJoinedOuterRecords(store, 
recordWindowHasClosed));
+}
+
+private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final 
long timestamp) {
+outerJoinWindowStore.ifPresent(store -> {
+final KeyAndJoinSide keyAndJoinSide = 
KeyAndJoinSide.make(!isLeftJoin, key);
+
+// Emit all expired records except the just found non-joined 
key. We need
+// to emit all expired records before calling put(), otherwise 
the internal
+// stream time will advance and may cause records out of the 
retention period to
+// be deleted.
+emitExpiredNonJoinedOuterRecords(store,
+r

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614312924



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName,

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 
recordWindowHasClosed =
+windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
 private WindowStore otherWindow;
 private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
+private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
 metrics = (StreamsMetricsImpl) context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-otherWindow = (WindowStore) 
context.getStateStore(otherWindowName);
+otherWindow = context.getStateStore(otherWindowName);
+
+if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+}
 }
 
+private boolean internalOuterJoinFixDisabled(final Map 
configs) {
+final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+if (value == null) {
+return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 
recordWindowHasClosed =
+windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
 private WindowStore otherWindow;
 private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
+private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
 metrics = (StreamsMetricsImpl) context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-otherWindow = (WindowStore) 
context.getStateStore(otherWindowName);
+otherWindow = context.getStateStore(otherWindowName);
+
+if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+}
 }
 
+private boolean internalOuterJoinFixDisabled(final Map 
configs) {
+final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+if (value == null) {
+return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;
+}
+
+if (value instanceof Boolean) {
+return (Boolean) value;
+} else {

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 
recordWindowHasClosed

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614312224



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
   final JoinWindows windows,
+   
   final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, LeftOrRightValue>> builder = new TimeOrderedWindowStoreBuilder, 
LeftOrRightValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+Time.SYSTEM

Review comment:
   It will require more changes just to allow that. The `KStreamImplJoin` 
constructor, where we could overload to pass a `Time` mock object, is only used 
by the `KStreamImplJoinImpl` class. The tests use the `StreamsBuilder` to 
create the joins, and they do not accept a Time object. 
   
   Also, the `Stores` class, which is called by `KStreamImplJoin`, does not 
mock it. Maybe because the same code changes required just for that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614304330



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;

Review comment:
   Interesting. Is this a current bug with the old join semantics?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614303123



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+// the put() call will advance the stream time, which causes 
records out of the retention
+// period to be deleted, thus not being emitted later.
+if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+emitExpiredNonJoinedOuterRecords();
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(isLeftJoin, key),
+LeftOrRightValue.make(isLeftJoin, value),
+inputRecordTimestamp));
+}
+}
+}
+}
+
+private void emitExpiredNonJoinedOuterRecords() {
+outerJoinWindowStore.ifPresent(store ->
+emitExpiredNonJoinedOuterRecords(store, 
recordWindowHasClosed));
+}
+
+private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final 
long timestamp) {
+outerJoinWindowStore.ifPresent(store -> {
+final KeyAndJoinSide keyAndJoinSide = 
KeyAndJoinSide.make(!isLeftJoin, key);
+
+// Emit all expired records except the just found non-joined 
key. We need
+// to emit all expired records before calling put(), otherwise 
the internal
+// stream time will advance and may cause records out of the 
retention period to
+// be deleted.
+emitExpiredNonJoinedOuterRecords(store,
+r

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614114447



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 
recordWindowHasClosed =
+windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
 private WindowStore otherWindow;
 private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
+private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
 metrics = (StreamsMetricsImpl) context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-otherWindow = (WindowStore) 
context.getStateStore(otherWindowName);
+otherWindow = context.getStateStore(otherWindowName);
+
+if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+}
 }
 
+private boolean internalOuterJoinFixDisabled(final Map 
configs) {
+final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+if (value == null) {
+return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
   That constant name is confusing. I just removed the *_DEFAULT constant 
and return false when the config is not set. But the idea is that the join fix 
is enabled (disabled = false) if the config is not set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614105673



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";
+
+// Get the suffix index of the joinThisGeneratedName to build the 
outer join store name.
+final String outerJoinStoreGeneratedName = 
KStreamImpl.OUTERSHARED_NAME
++ joinThisGeneratedName.substring(
+rightOuter
+? KStreamImpl.OUTERTHIS_NAME.length()
+: KStreamImpl.JOINTHIS_NAME.length());

Review comment:
   I initially generated a name with a new index for the shared store. 
However, seems this was going to cause incompatibilities in the topology 
because the new indexed increasing. Instead, now I just get the index from one 
of the current join stores. Why doesn't make sense? Is there another way to get 
an index? Or, do I really need to append an index at the end of the shared 
store?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614098535



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
   There is only one store shared between left/right joins. I can use only 
one name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611915606



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+store.put(otherJoinKey, null, 
otherRecordTimestamp);
+}
+});
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (timeTo < maxStreamTime) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(thisJoin, key),
+makeValueOrOtherValue(thisJoin, value),
+inputRecordTimestamp));
+}
+}
+
+outerJoinWindowStore.ifPresent(store -> {
+// only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+// if the current record is late, then there is no need to 
check for expired records
+if (inputRecordTimestamp == maxStreamTime) {
+maybeEmitOuterExpiryRecords(store, maxStreamTime);
+}
+});
+}
+}
+
+private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611915696



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+store.put(otherJoinKey, null, 
otherRecordTimestamp);
+}
+});
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (timeTo < maxStreamTime) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(thisJoin, key),
+makeValueOrOtherValue(thisJoin, value),
+inputRecordTimestamp));
+}
+}
+
+outerJoinWindowStore.ifPresent(store -> {
+// only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+// if the current record is late, then there is no need to 
check for expired records
+if (inputRecordTimestamp == maxStreamTime) {
+maybeEmitOuterExpiryRecords(store, maxStreamTime);
+}
+});
+}
+}
+
+private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+return thisJoin
+? ValueOrOtherValue.makeValue(value)
+: ValueOrOtherValue.makeOtherValue(value);
+}
+
+@SuppressWarnings("unchecked")
+private void maybeEmitOuterExpiryRecords(final 
WindowStore, ValueOrOtherValue> store, final long 
maxStreamTime) {
+try (final KeyValueIterator>, 
ValueOrOtherValue> it = store.all()) {
+while (it.hasNext()) {
+final KeyValue>, 
ValueOrOtherValue> e = it.next();
+
+// Skip next records if the oldest record has not expired 
yet
+if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+break;
+}
+
+final K key = e.key.key().getKey();
+
+// Emit the record by joining with a null value. But the 
order varies depending whether
+// this join is using a reverse joiner or not. Also 
wheth

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611882455



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {

Review comment:
   This needs to be supported in Window stores. I might need to write a KIP 
for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611878043



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
ValueOrOtherValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
final JoinWindows windows,
+   
final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, 
ValueOrOtherValue>> builder = new 
TimeOrderedWindowStoreBuilder, ValueOrOtherValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+Time.SYSTEM
+);
+if (streamJoinedInternal.loggingEnabled()) {
+builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+} else {
+builder.withLoggingDisabled();
+}
+
+return builder;
+}
+
+// This method has same code as Store.persistentWindowStore(). But 
TimeOrderedWindowStore is
+// a non-public API, so we need to keep duplicate code until it becomes 
public.
+private static WindowBytesStoreSupplier 
persistentTimeOrderedWindowStore(final String storeName,
+ 
final Duration retentionPeriod,
+ 
final Duration windowSize) {
+Objects.requireNonNull(storeName, "name cannot be null");
+final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+if (retentionMs < 0L) {
+throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+}
+if (windowSizeMs < 0L) {
+throw new IllegalArgumentException("windowSize cannot be 
negative");
+}
+if (segmentInterval < 1L) {
+throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
+}
+if (windowSizeMs > retentionMs) {
+throw new IllegalArgumentException("The retention period of the 
window store "
++ storeName + " must be no smaller than its window size. Got 
size=["
++ windowSizeMs + "], retention=[" + retentionMs + "]");
+}
+
+return new RocksDbWindowBytesStoreSupplier(
+storeName,
+retentionMs,
+segmentInterval,
+windowSizeMs,
+false,

Review comment:
   I had issues with duplicates, and forgot to investigate about it. I just 
did another round of investigation, but I still get issues with it. The problem 
is I cannot delete any key when duplicates are used. This happens in any window 
store, not just the time-ordered window store.
   
   The problem I found is:
   
   1. Added two duplicates with key = 0 and time = 0
   ```
   # this adds a key with seqNum = 0
   put(0, "A0", 0) 
   # this adds a key with seqNum = 1
   put(0, "A0-0", 0)
   ```
   2. Delete key = 0 and time = 0
   ```
   # this attempts to delete with seqNum = 2, which it does not exist
   put(0, null, 0)
   ```
   
   Initially I didn't think using duplicates were necessary, but I just wrote a 
test case with the old semantics and duplicates are processed, so I need to 
support it. Do you know if deleting duplicates was unsupported all the time? or 
am I missing some API or workaround?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610893784



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -161,7 +181,7 @@ public void process(final K key, final V1 value) {
 //
 // the condition below allows us to process the late 
record without the need
 // to hold it in the temporary outer store
-if (timeTo < maxStreamTime) {
+if (internalOuterJoinFixDisabled || timeTo < 
maxStreamTime) {

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891690



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+store.put(otherJoinKey, null, 
otherRecordTimestamp);
+}
+});
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (timeTo < maxStreamTime) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(thisJoin, key),
+makeValueOrOtherValue(thisJoin, value),
+inputRecordTimestamp));
+}
+}
+
+outerJoinWindowStore.ifPresent(store -> {
+// only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+// if the current record is late, then there is no need to 
check for expired records
+if (inputRecordTimestamp == maxStreamTime) {
+maybeEmitOuterExpiryRecords(store, maxStreamTime);
+}
+});
+}
+}
+
+private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+return thisJoin
+? ValueOrOtherValue.makeValue(value)
+: ValueOrOtherValue.makeOtherValue(value);
+}
+
+@SuppressWarnings("unchecked")
+private void maybeEmitOuterExpiryRecords(final 
WindowStore, ValueOrOtherValue> store, final long 
maxStreamTime) {
+try (final KeyValueIterator>, 
ValueOrOtherValue> it = store.all()) {
+while (it.hasNext()) {
+final KeyValue>, 
ValueOrOtherValue> e = it.next();
+
+// Skip next records if the oldest record has not expired 
yet
+if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+break;
+}
+
+final K key = e.key.key().getKey();
+
+// Emit the record by joining with a null value. But the 
order varies depending whether
+// this join is using a reverse joiner or not. Also 
wheth

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891590



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -92,6 +113,10 @@ public void process(final K key, final V1 value) {
 return;
 }
 
+// maxObservedStreamTime is updated and shared between left and 
right sides, so we can
+// process a non-join record immediately if it is late
+final long maxStreamTime = maxObservedStreamTime.updateAndGet(time 
-> Math.max(time, context().timestamp()));

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891253



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -38,20 +45,32 @@
 private final String otherWindowName;
 private final long joinBeforeMs;
 private final long joinAfterMs;
+private final long joinGraceMs;
 
 private final ValueJoinerWithKey joiner;
 private final boolean outer;
+private final Optional outerJoinWindowName;
+private final AtomicLong maxObservedStreamTime;
+private final boolean thisJoin;

Review comment:
   Are you suggesting using two bool variables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891116



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +132,40 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
ValueOrOtherValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";
+final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
+
+outerJoinWindowStore = 
Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+}
+
+// Time shared between joins to keep track of the maximum stream time

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610890964



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
ValueOrOtherValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
final JoinWindows windows,
+   
final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, 
ValueOrOtherValue>> builder = new 
TimeOrderedWindowStoreBuilder, ValueOrOtherValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),

Review comment:
   Are you talking about the ValueOrOtherValueSerde -> 
LeftOrRightValueSerde? If so, then Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610890414



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide {
+private final K key;
+private final boolean thisJoin;

Review comment:
   I changed to leftJoin. But I seems you suggested adding two bool 
variables, one for left and another for rigth?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide {
+private final K key;
+private final boolean thisJoin;
+
+private KeyAndJoinSide(final boolean thisJoin, final K key) {
+this.key = Objects.requireNonNull(key, "key is null");
+this.thisJoin = thisJoin;
+}
+
+/**
+ * Create a new {@link KeyAndJoinSide} instance if the provide {@code key} 
is not {@code null}.
+ *
+ * @param thisJoin True if the key is part of the left topic (reference as 
thisJoin in {@code KStreamImplJoin})
+ * @param key  the key
+ * @param   the type of the key
+ * @return a new {@link KeyAndJoinSide} instance if the provide {@code 
key} is not {@code null}
+ */
+public static  KeyAndJoinSide make(final boolean thisJoin, final K 
key) {
+return new KeyAndJoinSide<>(thisJoin, key);
+}
+
+public boolean isThisJoin() {
+return thisJoin;
+}
+
+public K getKey() {
+return key;
+}
+
+@Override
+public String toString() {
+return "<" + thisJoin + "," + key + ">";

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License 

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-09 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610754025



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+store.put(otherJoinKey, null, 
otherRecordTimestamp);
+}
+});
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (timeTo < maxStreamTime) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(thisJoin, key),
+makeValueOrOtherValue(thisJoin, value),
+inputRecordTimestamp));
+}
+}
+
+outerJoinWindowStore.ifPresent(store -> {
+// only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+// if the current record is late, then there is no need to 
check for expired records
+if (inputRecordTimestamp == maxStreamTime) {
+maybeEmitOuterExpiryRecords(store, maxStreamTime);
+}
+});
+}
+}
+
+private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+return thisJoin
+? ValueOrOtherValue.makeValue(value)
+: ValueOrOtherValue.makeOtherValue(value);
+}
+
+@SuppressWarnings("unchecked")
+private void maybeEmitOuterExpiryRecords(final 
WindowStore, ValueOrOtherValue> store, final long 
maxStreamTime) {
+try (final KeyValueIterator>, 
ValueOrOtherValue> it = store.all()) {
+while (it.hasNext()) {
+final KeyValue>, 
ValueOrOtherValue> e = it.next();
+
+// Skip next records if the oldest record has not expired 
yet
+if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+break;
+}
+
+final K key = e.key.key().getKey();
+
+// Emit the record by joining with a null value. But the 
order varies depending whether
+// this join is using a reverse joiner or not. Also 
wheth

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-08 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610044912



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -38,20 +48,36 @@
 private final String otherWindowName;
 private final long joinBeforeMs;
 private final long joinAfterMs;
+private final long joinGraceMs;
 
 private final ValueJoinerWithKey joiner;
 private final boolean outer;
+private final Optional outerJoinWindowName;
+private final boolean thisJoin;
+
+// Observed time is AtomicLong because this time is shared between the 
left and side processor nodes. However,
+// this time is not updated in parallel, so we can call get() several 
times without worry about getting different
+// times.
+private final AtomicLong maxObservedStreamTime;

Review comment:
   I think `context.currentStreamTimeMs()` should work. I wasn't awrare I 
could get the new stream time from it. I don't see any problems as I only need 
the stream time to expire the records. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org