[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-9244: -------------------------------------- Assignee: Matthias J. Sax > Update of old FK reference on RHS should not trigger join result > ---------------------------------------------------------------- > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: Kin Siu > Assignee: Matthias J. Sax > Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())", after we > published data on RHS with previous associated foreign key. > > {code:java} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic<String, String> right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic<String, String> left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic<String, String> outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore<String, String> store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map<String, String> expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map<String, String> expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)