mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1524044954


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+    private static final String LEFT = "left";
+    private static final String RIGHT = "right";
+    private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private static final Consumed<Integer, String> CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+    private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+    static List<Arguments> streams() {
+        return Arrays.asList(
+            innerJoin(),
+            leftJoin(),
+            outerJoin()
+        );
+    }
+
+    private static Arguments innerJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Integer, String> stream = builder.stream(LEFT, 
CONSUMED).join(
+            builder.stream(RIGHT, CONSUMED),
+            MockValueJoiner.TOSTRING_JOINER,
+            WINDOW,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        final MockApiProcessorSupplier<Integer, String, Object, Object> 
processorSupplier = new MockApiProcessorSupplier<>();
+        stream.process(processorSupplier);
+        return Arguments.of(builder.build(PROPS), processorSupplier);
+    }
+
+    private static Arguments leftJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Integer, String> stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+            builder.stream(RIGHT, CONSUMED),
+            MockValueJoiner.TOSTRING_JOINER,
+            WINDOW,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        final MockApiProcessorSupplier<Integer, String, Object, Object> 
processorSupplier = new MockApiProcessorSupplier<>();
+        stream.process(processorSupplier);
+        return Arguments.of(builder.build(PROPS), processorSupplier);
+    }
+
+    private static Arguments outerJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Integer, String> stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+            builder.stream(RIGHT, CONSUMED),
+            MockValueJoiner.TOSTRING_JOINER,
+            WINDOW,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        final MockApiProcessorSupplier<Integer, String, Object, Object> 
supplier = new MockApiProcessorSupplier<>();
+        stream.process(supplier);
+        return Arguments.of(builder.build(PROPS), supplier);
+    }
+
+    @ParameterizedTest
+    @MethodSource("streams")
+    public void recordsArrivingPostWindowCloseShouldBeDropped(
+        final Topology topology,
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier) {
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, PROPS)) {
+            final TestInputTopic<Integer, String> left =
+                driver.createInputTopic(KStreamKStreamWindowCloseTest.LEFT, 
new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+            final TestInputTopic<Integer, String> right =
+                driver.createInputTopic(KStreamKStreamWindowCloseTest.RIGHT, 
new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            left.pipeInput(0, "left", 15);
+            right.pipeInput(-1, "bumpTime", 40);
+            assertRecordDropCount(0.0, processor);
+
+            right.pipeInput(0, "closedAt40", 24);
+            assertRecordDropCount(1.0, processor);
+

Review Comment:
   Thanks. No rush! We still have time util 3.8 comes around :)
   
   For joins it's a little bit more complicated than for aggregations, but in 
the end, we need to accept a record as along as a window is open, and 
window-close time is defined as window-end+grace+1 (for join windows). And for 
joins, the definition of what a window is, is a little bit more complex 
(especially when taking "asymmetric" windows into account).
   
   I guess bottom line is, that every input record, does conceptually open a 
join window over the other input stream, and we need to keep this window open 
an accept records from the other side until we reach window close time. At the 
same time, if an out-of-order record comes in and we put it into an existing 
join window, it might be allowed to open it's own join window any longer, 
because it's own join window is already too old (this is the weird case that 
cannot happen for aggregations...)
   
   I did not think about the right/best way how to compute this correctly, but 
I am sure you'll figure it out :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to