Kin Siu created KAFKA-9244:
------------------------------

             Summary: Update of old FK reference on RHS should not trigger join 
result
                 Key: KAFKA-9244
                 URL: https://issues.apache.org/jira/browse/KAFKA-9244
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.4.0
            Reporter: Kin Siu


Perform a KTable-KTable foreign key join, after changing LHS FK reference from 
FK1 -> FK2, populating update on RHS with FK1 should not produce join result.

 
{code:java}
    @Test
    public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, 
streamsConfig)) {
            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");

            // Pre-populate the RHS records. This test is all about what 
happens when we change LHS records foreign key reference
            // then populate update on RHS
            right.pipeInput("rhs1", "rhsValue1");
            right.pipeInput("rhs2", "rhsValue2");

            assertThat(
                    outputTopic.readKeyValuesToMap(),
                    is(emptyMap())
            );
            assertThat(
                    asMap(store),
                    is(emptyMap())
            );

            left.pipeInput("lhs1", "lhsValue1|rhs1");

            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Change LHS foreign key reference
            left.pipeInput("lhs1", "lhsValue1|rhs2");
            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Populate RHS update on old LHS foreign key ref
            right.pipeInput("rhs1", "rhsValue1Delta");
            {
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(emptyMap())
                );
                assertThat(
                        asMap(store),
                        is(mkMap(
                                mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                        ))
                );
            }
        }
    }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to