Kin Siu created KAFKA-9244: ------------------------------ Summary: Update of old FK reference on RHS should not trigger join result Key: KAFKA-9244 URL: https://issues.apache.org/jira/browse/KAFKA-9244 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.0 Reporter: Kin Siu
Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map<String, String> expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)