[ 
https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kin Siu updated KAFKA-9244:
---------------------------
    Description: 
Perform a KTable-KTable foreign key join, after changing LHS FK reference from 
FK1 -> FK2, populating update on RHS with FK1 should not produce join result.

Below test case failed at the point of last "is(emptyMap())", after we 
published data on RHS with previous associated foreign key.

 
{code:java}
    @Test
    public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, 
streamsConfig)) {
            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");

            // Pre-populate the RHS records. This test is all about what 
happens when we change LHS records foreign key reference
            // then populate update on RHS
            right.pipeInput("rhs1", "rhsValue1");
            right.pipeInput("rhs2", "rhsValue2");

            assertThat(
                    outputTopic.readKeyValuesToMap(),
                    is(emptyMap())
            );
            assertThat(
                    asMap(store),
                    is(emptyMap())
            );

            left.pipeInput("lhs1", "lhsValue1|rhs1");

            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Change LHS foreign key reference
            left.pipeInput("lhs1", "lhsValue1|rhs2");
            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Populate RHS update on old LHS foreign key ref
            right.pipeInput("rhs1", "rhsValue1Delta");
            {
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(emptyMap())
                );
                assertThat(
                        asMap(store),
                        is(mkMap(
                                mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                        ))
                );
            }
        }
    }
{code}

  was:
Perform a KTable-KTable foreign key join, after changing LHS FK reference from 
FK1 -> FK2, populating update on RHS with FK1 should not produce join result.

Below test case failed at the point of last "is(emptyMap())".

 
{code:java|linenumbers=true}
    @Test
    public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
        final Topology topology = getTopology(streamsConfig, "store", leftJoin);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, 
streamsConfig)) {
            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");

            // Pre-populate the RHS records. This test is all about what 
happens when we change LHS records foreign key reference
            // then populate update on RHS
            right.pipeInput("rhs1", "rhsValue1");
            right.pipeInput("rhs2", "rhsValue2");

            assertThat(
                    outputTopic.readKeyValuesToMap(),
                    is(emptyMap())
            );
            assertThat(
                    asMap(store),
                    is(emptyMap())
            );

            left.pipeInput("lhs1", "lhsValue1|rhs1");

            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Change LHS foreign key reference
            left.pipeInput("lhs1", "lhsValue1|rhs2");
            {
                final Map<String, String> expected = mkMap(
                        mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                );
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(expected)
                );
                assertThat(
                        asMap(store),
                        is(expected)
                );
            }

            // Populate RHS update on old LHS foreign key ref
            right.pipeInput("rhs1", "rhsValue1Delta");
            {
                assertThat(
                        outputTopic.readKeyValuesToMap(),
                        is(emptyMap())
                );
                assertThat(
                        asMap(store),
                        is(mkMap(
                                mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                        ))
                );
            }
        }
    }
{code}


> Update of old FK reference on RHS should not trigger join result
> ----------------------------------------------------------------
>
>                 Key: KAFKA-9244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9244
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Kin Siu
>            Priority: Major
>
> Perform a KTable-KTable foreign key join, after changing LHS FK reference 
> from FK1 -> FK2, populating update on RHS with FK1 should not produce join 
> result.
> Below test case failed at the point of last "is(emptyMap())", after we 
> published data on RHS with previous associated foreign key.
>  
> {code:java}
>     @Test
>     public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
>         final Topology topology = getTopology(streamsConfig, "store", 
> leftJoin);
>         try (final TopologyTestDriver driver = new 
> TopologyTestDriver(topology, streamsConfig)) {
>             final TestInputTopic<String, String> right = 
> driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestInputTopic<String, String> left = 
> driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestOutputTopic<String, String> outputTopic = 
> driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
> StringDeserializer());
>             final KeyValueStore<String, String> store = 
> driver.getKeyValueStore("store");
>             // Pre-populate the RHS records. This test is all about what 
> happens when we change LHS records foreign key reference
>             // then populate update on RHS
>             right.pipeInput("rhs1", "rhsValue1");
>             right.pipeInput("rhs2", "rhsValue2");
>             assertThat(
>                     outputTopic.readKeyValuesToMap(),
>                     is(emptyMap())
>             );
>             assertThat(
>                     asMap(store),
>                     is(emptyMap())
>             );
>             left.pipeInput("lhs1", "lhsValue1|rhs1");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Change LHS foreign key reference
>             left.pipeInput("lhs1", "lhsValue1|rhs2");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Populate RHS update on old LHS foreign key ref
>             right.pipeInput("rhs1", "rhsValue1Delta");
>             {
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(emptyMap())
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(mkMap(
>                                 mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                         ))
>                 );
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to