Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-2041579829 Hi @mjsax It's now rebased. Plus, I addressed your reply below. > If find this test a little bit hard to follow... > > We start to open a window `[5;15]` with the first input record. This window would close at `31` so why do we dump to `40`, and test dropping with `ts=24` -- both are totally unrelated to the left input record's window. > > Also, it would be good to test that we can join out-of-order record successfully as long as the window is open, and that we don't drop re-mature before we hit the close time, thus I would suggest something like this: > > ``` > // prepare > - left input at 15 -> open window > // positive test > - bump time to 30 (different key) -> window still open > - right input at 5 -> joins (no need to test with 4, because 4 won't join because of window size anyway...) > - right input at 25 -> joins > // negative test > - bump time to 31 (different key) -> window closes > - right input at 5 -> dropped (recorded with metric) > - right input at 25 -> does not join any longer; window closed (for this case we don't drop and don't record metric) > // test sharp lower drop bound > - right input at 6 -> does not join any longer; window closed (for this case we don't drop though and don't record metric) > // cont. with additional sanity check: > - left input at 16 -> joins with both right input at 6 and right input at 25 -- to verify both records did not get dropped > ``` I would argue that the second 'right input at 25' still should join / emit a record because the other window is still active and 'ts=15' within its bounds. Other than that the expected behavior of the test cases in KStreamKStreamWindowCloseTest is in line with what you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-2035119703 Hi @mjsax , thanks for the flag. Yes I'll push the necessary changes by the end of the week (Sunday). I hope that's ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-2032098989 Just merged https://github.com/apache/kafka/pull/15510 -- can we move forward with this PR (maybe by rebasing it to see if any tests break?)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1524044954 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier supplier) { +
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier suppl
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier suppl
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1520671475 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier supplier) { +
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1520627804 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java: ## @@ -25,36 +25,46 @@ public final class StreamStreamJoinUtil { -private StreamStreamJoinUtil(){ +private StreamStreamJoinUtil() { } public static boolean skipRecord( final Record record, final Logger logger, final Sensor droppedRecordsSensor, -final ProcessorContext context) { +final ProcessorContext context +) { // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored if (record.key() == null || record.value() == null) { -if (context.recordMetadata().isPresent()) { -final RecordMetadata recordMetadata = context.recordMetadata().get(); -logger.warn( -"Skipping record due to null key or value. " -+ "topic=[{}] partition=[{}] offset=[{}]", -recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() -); -} else { -logger.warn( -"Skipping record due to null key or value. Topic, partition, and offset not known." -); -} -droppedRecordsSensor.record(); +logSkip("null key or value", logger, droppedRecordsSensor, context); return true; } else { return false; } } + +public static void logSkip( +final String reason, +final Logger logger, +final Sensor droppedRecordsSensor, +final ProcessorContext context +) { +if (context.recordMetadata().isPresent()) { +final RecordMetadata recordMetadata = context.recordMetadata().get(); +logger.warn( +"Skipping record. reason=[{}] topic=[{}] partition=[{}] offset=[{}]", Review Comment: ```suggestion "Skipping record. Reason=[{}] topic=[{}] partition=[{}] offset=[{}]", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1520626385 ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") Review Comment: Oh. This is `null`-key case. My bad. For `null`-key we know it won't join in the future, so no reason to artificially delay the output. Thanks for pointing it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1518848428 ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") Review Comment: I just realized that partition and offset are not available in this context... So, with regards to the second bullet point, probably the only distinguishing feature would be the value itself? I start to wonder, is it really worth it to add this complexit? We know all three events should eventually be sent downstream anyway. This way we could avoid using additional storage & extending `TimestampedKeyAndJoinSideSerde` with another generic field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-1986999670 > So I would have expected that some test need an update with either advancing time pro-actively, or by expected certain result later in the test, because windows are closes later? Indeed good point. I think the problem lies with https://github.com/florin-akermann/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L247 `outerJoinLeftBreak && outerJoinRightBreak` is always false. Hence, this loop never exits early? I think this behavior has been introduced as part of https://github.com/apache/kafka/pull/14426? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1518671144 ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") Review Comment: I wonder how to best solve this. Multiple null-key records would colide in the 'outerJoinStore' as they potentially all could have the same key of type 'TimestampedKeyAndJoinSide'. E.g. for a left stream | Key | value | ts | |--|---|| | null | a | 1 | | null | b | 1 | | null | c | 1 | We probably would only get to see 'c' even though we would like to see 'a','b' and 'c'? From the top of my head I see two options to handle this. - Maintain an additional store just for null-key records where such records wouldn't collide. - Adjust the 'outerJoinStore' key type `TimestampedKeyAndJoinSide'`( E.g. by adding offset and partition as optional fields. This way null-key records could be distinguished and for keyed records the old behavior can be kept) Personally I prefer the latter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667989 ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") Review Comment: Yes, good point. As is, the KStreamKStreamJoin forwards null-key records directly and not only after the window closes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667829 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() { } } +@Test +public void recordsArrivingPostWindowCloseShouldBeDropped() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream joined = builder.stream(topic1, consumed).join( +builder.stream(topic2, consumed), +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +joined.process(supplier); + + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +final TestInputTopic left = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic right = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +left.pipeInput(0, "left", 15); +right.pipeInput(-1, "bumpTime", 40); +assertRecordDropCount(0.0, processor); + +right.pipeInput(0, "closesAt39", 24); Review Comment: Thanks, ok, I adjuted the 'hint' in the value accordingly. I don't think we have off-by-one issue here: `[14;34 + 5]` so the record is considered 'too late' at t=40? In other words for this test case it was purely a misleading 'hint'? On a different note, I deleted the test case in `KStreamKStreamJoinTest` and refer to `KStreamKStreamWindowCloseTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1515212230 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java: ## @@ -25,36 +25,46 @@ public final class StreamStreamJoinUtil { -private StreamStreamJoinUtil(){ +private StreamStreamJoinUtil() { } public static boolean skipRecord( final Record record, final Logger logger, final Sensor droppedRecordsSensor, -final ProcessorContext context) { +final ProcessorContext context +) { // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored if (record.key() == null || record.value() == null) { -if (context.recordMetadata().isPresent()) { -final RecordMetadata recordMetadata = context.recordMetadata().get(); -logger.warn( -"Skipping record due to null key or value. " -+ "topic=[{}] partition=[{}] offset=[{}]", -recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() -); -} else { -logger.warn( -"Skipping record due to null key or value. Topic, partition, and offset not known." -); -} -droppedRecordsSensor.record(); +logSkip("null key or value", logger, droppedRecordsSensor, context); return true; } else { return false; } } + +public static void logSkip( +final String reason, +final Logger logger, +final Sensor droppedRecordsSensor, +final ProcessorContext context +) { +if (context.recordMetadata().isPresent()) { +final RecordMetadata recordMetadata = context.recordMetadata().get(); +logger.warn( +"Skipping record. reason=[{}] topic=[{}] partition=[{}] offset=[{}]", Review Comment: Nit: Just comparing to `AbstractKStreamTimeWindowAggregateProcessor#logSkippedRecordForExpiredWindow` is seems we could addd more information -- should we try to merge both "skip reason" as your PR proposes, if have two different output, one key null-case, and a different one for "expired" case similar to windowed-aggregation? ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") Review Comment: Why is this record already in the output? We should not drop it, but it seems we cannot emit it right away either, because we need to wait until the window closes, so would need to pipe one more record with ts=91 to flush out this result? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() { } } +@Test +public void recordsArrivingPostWindowCloseShouldBeDropped() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream joined = builder.stream(topic1, consumed).join( +builder.stream(topic2, consumed), +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +joined.process(supplier); + + +try (final TopologyTestDriver driver = new Topology
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-1981795443 @mjsax thanks, it is rebased. The tests didn't need any adjustments due to the idea quoted below. > I realized that if we only want to assert that late records get dropped and not look at the join result then we could even reuse the same test case for all three involved operators (inner, left, outer) as shown in `KStreamKStreamWindowCloseTest`. ... > If you agree then I would remove the `.recordsArrivingPostWindowCloseShouldBeDropped()` from `KStreamKStreamJoinTest, KStreamKStreamLeftJoinTest and KStreamKStreamOuterJoinTest`. I have removed the extensions to KStreamKStreamJoinTest, KStreamKStreamLeftJoinTest and KStreamKStreamOuterJoinTest again. Do you agree with this approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-1979900538 @florin-akermann -- I finally merged https://github.com/apache/kafka/pull/14426 -- can you rebase this PR and fixup tests so we can move forward with this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-1951981784 > Replied in-line -- in the end I don't have a strong opinion -- if you think it's better to use grace=MAX (instead of just "large enough 150) and add a new tests method just for grace, I am also happy with it. @mjsax Thank you for the feedback. I opted for 'large enough' and new test methods just for grace. I realized that if we only want to assert that late records get dropped and not look at the join result then we could even reuse the same test case for all three involved operators (inner, left, outer) as shown in `KStreamKStreamWindowCloseTest`. If you agree then I would remove the `.recordsArrivingPostWindowCloseShouldBeDropped()` from `KStreamKStreamJoinTest, KStreamKStreamLeftJoinTest and KStreamKStreamOuterJoinTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493075382 ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") +), +out.readKeyValuesToList() +); } @Test Review Comment: Yes, I will check. Similarily, I realized `KStreamKStreamSelfJoin` shoud probably also drop 'too late' records? I guess this would also be a separate Jira ticket & PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-1949473525 > Thanks for the fix! Overall LGTM. Couple of comments. @mjsax Thank you for all the good points. I agree with all of them. However, first I would like to align on whether we really want to extend the window bound tests to assert the correct grace period behavior. See my reply on 'Why + 1'. Personally, I would say I just write separate TestClasses/Cases to assert the grace period behavior (including sensor checks). Plus, as you suggested, the window bound tests will just be updated with large grace periods (E.g. Long.MAX - some constant). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: On second thought, note my coment below on 'Why + 1 '. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: On second thought, note my coment on 'Why + 1 '. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493061626 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -1363,6 +1365,16 @@ public void testWindowing() { new KeyValueTimestamp<>(2, "L2+l2", 2002L), new KeyValueTimestamp<>(3, "L3+l3", 2003L) ); + +//push two items with timestamp at grace edge; this should produce one join item, M0 is 'too late' +final long currentStreamTime = 2104; +final long lowerBound = currentStreamTime - timeDifference.toMillis() - grace.toMillis(); +inputTopic1.pipeInput(0, "M0", lowerBound - 1); +inputTopic1.pipeInput(1, "M1", lowerBound + 1); Review Comment: I can see now that the naming is misleading. The lowerbound is with regards to the grace period, 1900 However the lowerbound of the winow `1:l1` is 1901 So the +1 was there to make sure it is still within the window. In general I start to wonder whether it wouldn't make more sense to test these two concerns (grace & windowing) separatley. E.g. with grace 150, `M0` is just a test case to assert that late records get dropped and `M1` is just another window bound test. With grace 104 we get the 'grace bound' and the 'l0 lower window bound' to overlap but it might be confusing. In other words, as you said 'this test aims to test window bounds'. So maybe I should move grace period tests into a separate test class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: Good idea with the sensor. Just for me to understand, is 150 an arbitrarily chosen value or how did you come up with it? Wouldn't it make sense to set it to at least (max_timestamp in test case - min_timestamp in test case) e.g. 1104 - 899? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: Good idea with the sensor. Just for me to understand, is 150 an arbitrarily chosen value or how did you come up with it? Wouldn't it make sense to set it to (max_timestamp in test case - min_timestamp in test case) e.g. 1104 - 899? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -816,10 +816,12 @@ public void testWindowing() { stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); +final Duration timeDifference = ofMillis(100L); +final Duration grace = ofMillis(104); Review Comment: Good idea with the sensor. Just for me to understand, is 150 an arbitrarily chosen value or how did you come up with it? Wouldn't it make sense to set it to (max_timestamp in test case - min_timestamp in test case) e.g. 1104 - 899 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1492958285 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -191,6 +191,10 @@ public void process(final Record record) { } } +private boolean isActiveWindow(final long timeFrom, final long timeTo) { +return sharedTimeTracker.streamTime >= timeFrom && timeTo + joinGraceMs >= sharedTimeTracker.streamTime; Review Comment: yes, indeed, it is redundant to check for `sharedTimeTracker.streamTime >= timeFrom` adjusted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1491795591 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -191,6 +191,10 @@ public void process(final Record record) { } } +private boolean isActiveWindow(final long timeFrom, final long timeTo) { +return sharedTimeTracker.streamTime >= timeFrom && timeTo + joinGraceMs >= sharedTimeTracker.streamTime; Review Comment: Do we need to check `sharedTimeTracker.streamTime >= timeFrom` ? If I am not mistaken, `timeTo` is always larger that `timeFrom`, and `graceMs` is always non-negative. Thus, if `timeTo + joinGraceMs >= sharedTimeTracker.streamTime;` it implies that `sharedTimeTracker.streamTime >= timeFrom` is `true`? And if `timeTo + joinGraceMs >= sharedTimeTracker.streamTime;` is false, the whole condition is false independent of `sharedTimeTracker.streamTime >= timeFrom`? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -128,7 +128,7 @@ public void process(final Record record) { final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); - +if (!isActiveWindow(timeFrom, timeTo)) return; Review Comment: nit: code style -- we never use blocks without `{ }` -- this should be ``` if (!isActiveWindow(timeFrom, timeTo)) { return; } ``` Also, I think we should record this and call `StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())` similar to key and/or valud being `null`. ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -1363,6 +1365,16 @@ public void testWindowing() { new KeyValueTimestamp<>(2, "L2+l2", 2002L), new KeyValueTimestamp<>(3, "L3+l3", 2003L) ); + +//push two items with timestamp at grace edge; this should produce one join item, M0 is 'too late' +final long currentStreamTime = 2104; +final long lowerBound = currentStreamTime - timeDifference.toMillis() - grace.toMillis(); +inputTopic1.pipeInput(0, "M0", lowerBound - 1); +inputTopic1.pipeInput(1, "M1", lowerBound + 1); Review Comment: Why `+ 1` -- `lowerBound` by itself should already be inclusive and produce a join result? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -1363,6 +1365,16 @@ public void testWindowing() { new KeyValueTimestamp<>(2, "L2+l2", 2002L), new KeyValueTimestamp<>(3, "L3+l3", 2003L) ); + +//push two items with timestamp at grace edge; this should produce one join item, M0 is 'too late' Review Comment: We should add the "drop record sensor check == 0" before we intentionally push "late" data that should be dropped, and we should also test the sensor at the very end that is recorded the dropped records. ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") +), +out.readKeyValuesToList() +); } @Test Review Comment: Not related to this PR, but the new stream-table join with versioned state stores also has a grace-period -- can we double check the KIP if we decided to drop stream-records for left-join when they arrive after grace-period and maybe extend this test below (ie `testRelaxedLeftStreamTableJoin`) accordingly (in a follow up PR)? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ## @@ -1651,7 +1663,7 @@ public void testAsymmetricWindowingBefore() { joined = stream1.join(