[ 
https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kin Siu updated KAFKA-9244:
---------------------------
    Description: 
Perform a KTable-KTable foreign key join, after changing LHS FK reference from 
FK1 -> FK2, populating update on RHS with FK1 should not produce join result.

Below test case failed though.

 
{code:java}
    @Test
    public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, 
streamsConfig)) {
            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");

            // Pre-populate the RHS records. This test is all about what 
happens when we change LHS records foreign key reference
            // then populate update on RHS
            right.pipeInput("rhs1", "rhsValue1");
            right.pipeInput("rhs2", "rhsValue2");

            assertThat(
                    outputTopic.readKeyValuesToMap(),
                    is(emptyMap())
            );
            assertThat(
                    asMap(store),
                    is(emptyMap())
            );

            left.pipeInput("lhs1", "lhsValue1|rhs1");

            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Change LHS foreign key reference
            left.pipeInput("lhs1", "lhsValue1|rhs2");
            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Populate RHS update on old LHS foreign key ref
            right.pipeInput("rhs1", "rhsValue1Delta");
            {
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(emptyMap())
                );
                assertThat(
                        asMap(store),
                        is(mkMap(
                                mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                        ))
                );
            }
        }
    }
{code}

  was:
Perform a KTable-KTable foreign key join, after changing LHS FK reference from 
FK1 -> FK2, populating update on RHS with FK1 should not produce join result.

 
{code:java}
    @Test
    public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, 
streamsConfig)) {
            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");

            // Pre-populate the RHS records. This test is all about what 
happens when we change LHS records foreign key reference
            // then populate update on RHS
            right.pipeInput("rhs1", "rhsValue1");
            right.pipeInput("rhs2", "rhsValue2");

            assertThat(
                    outputTopic.readKeyValuesToMap(),
                    is(emptyMap())
            );
            assertThat(
                    asMap(store),
                    is(emptyMap())
            );

            left.pipeInput("lhs1", "lhsValue1|rhs1");

            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Change LHS foreign key reference
            left.pipeInput("lhs1", "lhsValue1|rhs2");
            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Populate RHS update on old LHS foreign key ref
            right.pipeInput("rhs1", "rhsValue1Delta");
            {
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(emptyMap())
                );
                assertThat(
                        asMap(store),
                        is(mkMap(
                                mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                        ))
                );
            }
        }
    }
{code}


> Update of old FK reference on RHS should not trigger join result
> ----------------------------------------------------------------
>
>                 Key: KAFKA-9244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9244
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Kin Siu
>            Priority: Major
>
> Perform a KTable-KTable foreign key join, after changing LHS FK reference 
> from FK1 -> FK2, populating update on RHS with FK1 should not produce join 
> result.
> Below test case failed though.
>  
> {code:java}
>     @Test
>     public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
>         final Topology topology = getTopology(streamsConfig, "store", 
> leftJoin);
>         try (final TopologyTestDriver driver = new 
> TopologyTestDriver(topology, streamsConfig)) {
>             final TestInputTopic<String, String> right = 
> driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestInputTopic<String, String> left = 
> driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestOutputTopic<String, String> outputTopic = 
> driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
> StringDeserializer());
>             final KeyValueStore<String, String> store = 
> driver.getKeyValueStore("store");
>             // Pre-populate the RHS records. This test is all about what 
> happens when we change LHS records foreign key reference
>             // then populate update on RHS
>             right.pipeInput("rhs1", "rhsValue1");
>             right.pipeInput("rhs2", "rhsValue2");
>             assertThat(
>                     outputTopic.readKeyValuesToMap(),
>                     is(emptyMap())
>             );
>             assertThat(
>                     asMap(store),
>                     is(emptyMap())
>             );
>             left.pipeInput("lhs1", "lhsValue1|rhs1");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Change LHS foreign key reference
>             left.pipeInput("lhs1", "lhsValue1|rhs2");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Populate RHS update on old LHS foreign key ref
>             right.pipeInput("rhs1", "rhsValue1Delta");
>             {
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(emptyMap())
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(mkMap(
>                                 mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                         ))
>                 );
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to