[ 
https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984174#comment-16984174
 ] 

Matthias J. Sax commented on KAFKA-9244:
----------------------------------------

Ah sorry. I mixed up something when reading your test case. I was able to 
reproduce the issue. Thank for reporting it.

> Update of old FK reference on RHS should not trigger join result
> ----------------------------------------------------------------
>
>                 Key: KAFKA-9244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9244
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Kin Siu
>            Priority: Major
>
> Perform a KTable-KTable foreign key join, after changing LHS FK reference 
> from FK1 -> FK2, populating update on RHS with FK1 should not produce join 
> result.
> Below test case failed at the point of last "is(emptyMap())", after we 
> published data on RHS with previous associated foreign key.
>  
> {code:java}
>     @Test
>     public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
>         final Topology topology = getTopology(streamsConfig, "store", 
> leftJoin);
>         try (final TopologyTestDriver driver = new 
> TopologyTestDriver(topology, streamsConfig)) {
>             final TestInputTopic<String, String> right = 
> driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestInputTopic<String, String> left = 
> driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestOutputTopic<String, String> outputTopic = 
> driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
> StringDeserializer());
>             final KeyValueStore<String, String> store = 
> driver.getKeyValueStore("store");
>             // Pre-populate the RHS records. This test is all about what 
> happens when we change LHS records foreign key reference
>             // then populate update on RHS
>             right.pipeInput("rhs1", "rhsValue1");
>             right.pipeInput("rhs2", "rhsValue2");
>             assertThat(
>                     outputTopic.readKeyValuesToMap(),
>                     is(emptyMap())
>             );
>             assertThat(
>                     asMap(store),
>                     is(emptyMap())
>             );
>             left.pipeInput("lhs1", "lhsValue1|rhs1");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Change LHS foreign key reference
>             left.pipeInput("lhs1", "lhsValue1|rhs2");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Populate RHS update on old LHS foreign key ref
>             right.pipeInput("rhs1", "rhsValue1Delta");
>             {
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(emptyMap())
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(mkMap(
>                                 mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                         ))
>                 );
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to