[ https://issues.apache.org/jira/browse/KAFKA-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-13721: --------------------------------------- Assignee: Matthias J. Sax > Left-join still emit spurious results in stream-stream joins in some cases > -------------------------------------------------------------------------- > > Key: KAFKA-13721 > URL: https://issues.apache.org/jira/browse/KAFKA-13721 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.1.0 > Reporter: Nollet > Assignee: Matthias J. Sax > Priority: Major > > Stream-stream joins seems to still emit spurious results for some window > configurations. > From my tests, it happened when setting before to 0 and having a grace period > smaller than the window duration. More precisely it seems to happen when > setting before and > window duration > grace period + before > h2. how to reproduce > {code:java} > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.TestInputTopic; > import org.apache.kafka.streams.TestOutputTopic; > import org.apache.kafka.streams.Topology; > import org.apache.kafka.streams.TopologyTestDriver; > import org.apache.kafka.streams.kstream.JoinWindows; > import org.apache.kafka.streams.kstream.KStream; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.time.Duration; > import java.time.Instant; > import java.util.Properties; > public class SpuriousLeftJoinTest { > static final Duration WINDOW_DURATION = Duration.ofMinutes(10); > static final Duration GRACE = Duration.ofMinutes(6); > static final Duration BEFORE = Duration.ZERO; > static final String LEFT_TOPIC_NAME = "LEFT_TOPIC"; > static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC"; > static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC"; > private static TopologyTestDriver testDriver; > private static TestInputTopic<String, Integer> inputTopicLeft; > private static TestInputTopic<String, Integer> inputTopicRight; > private static TestOutputTopic<String, Integer> outputTopic; > public static Topology createTopology() { > StreamsBuilder builder = new StreamsBuilder(); > KStream<String, Integer> leftStream = builder.stream(LEFT_TOPIC_NAME); > KStream<String, Integer> rightStream = > builder.stream(RIGHT_TOPIC_NAME); > // return 1 if left join matched, otherwise 0 > KStream<String, Integer> joined = leftStream.leftJoin( > rightStream, > (value1, value2) -> { > if(value2 == null){ > return 0; > } > return 1; > }, > JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE) > .before(BEFORE) > ); > joined.to(OUTPUT_TOPIC_NAME); > return builder.build(); > } > @Before > public void setup() { > Topology topology = createTopology(); > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.StringSerde.class); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.IntegerSerde.class); > testDriver = new TopologyTestDriver(topology, props); > inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, > Serdes.String().serializer(), Serdes.Integer().serializer()); > inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, > Serdes.String().serializer(), Serdes.Integer().serializer()); > outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, > Serdes.String().deserializer(), Serdes.Integer().deserializer()); > } > @After > public void tearDown() { > testDriver.close(); > } > @Test > public void shouldEmitOnlyOneMessageForKey1(){ > Instant now = Instant.now(); > inputTopicLeft.pipeInput("key1", 12, now); > inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION)); > // send later record to increase stream time & close the window > inputTopicLeft.pipeInput("other_key", 1212122, > now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10)); > while (! outputTopic.isEmpty()){ > System.out.println(outputTopic.readKeyValue()); > } > } > } > {code} > Stdout of previous code is > {noformat} > KeyValue(key1, 0) > KeyValue(key1, 1) > {noformat} > However it should be > {noformat} > KeyValue(key1, 1) > {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)