[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kin Siu updated KAFKA-9244: --------------------------- Description: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed though. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} was: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} > Update of old FK reference on RHS should not trigger join result > ---------------------------------------------------------------- > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: Kin Siu > Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed though. > > {code:java} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic<String, String> right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic<String, String> left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic<String, String> outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore<String, String> store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map<String, String> expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map<String, String> expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)