[ 
https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984045#comment-16984045
 ] 

Matthias J. Sax edited comment on KAFKA-9244 at 11/28/19 4:19 AM:
------------------------------------------------------------------

In you last test case,
{quote}{{right.pipeInput("rhs1", "rhsValue1Delta");}}
{quote}
Why do you expect:
{quote}{{mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")}}
{quote}
As you say not update should be produced, and thus the result should be empty.

Note that each time you call
{quote}{{outputTopic.readKeyValuesToMap()}}
{quote}
output records are received and than "purged", hence, you won't receive them 
twice. `readKeyValuesToMap()` does _not_ access the KTable state store, but it 
only consumer the table output/changelog topic and does not re-read the whole 
topic each time, but reads it incrementally.


was (Author: mjsax):
In you last test case,
right.pipeInput("rhs1", "rhsValue1Delta");
Why do you expect:
mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
As you say not update should be produced, and thus the result should be empty.

Note that each time you call
outputTopic.readKeyValuesToMap()
output records are received and than "purged", hence, you won't receive them 
twice. `readKeyValuesToMap()` does _not_ access the KTable state store, but it 
only consumer the table output/changelog topic and does not re-read the whole 
topic each time, but reads it incrementally.

> Update of old FK reference on RHS should not trigger join result
> ----------------------------------------------------------------
>
>                 Key: KAFKA-9244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9244
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Kin Siu
>            Priority: Major
>
> Perform a KTable-KTable foreign key join, after changing LHS FK reference 
> from FK1 -> FK2, populating update on RHS with FK1 should not produce join 
> result.
> Below test case failed at the point of last "is(emptyMap())".
>  
> {code:java|linenumbers=true}
>     @Test
>     public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
>         final Topology topology = getTopology(streamsConfig, "store", 
> leftJoin);
>         try (final TopologyTestDriver driver = new 
> TopologyTestDriver(topology, streamsConfig)) {
>             final TestInputTopic<String, String> right = 
> driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestInputTopic<String, String> left = 
> driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
> StringSerializer());
>             final TestOutputTopic<String, String> outputTopic = 
> driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
> StringDeserializer());
>             final KeyValueStore<String, String> store = 
> driver.getKeyValueStore("store");
>             // Pre-populate the RHS records. This test is all about what 
> happens when we change LHS records foreign key reference
>             // then populate update on RHS
>             right.pipeInput("rhs1", "rhsValue1");
>             right.pipeInput("rhs2", "rhsValue2");
>             assertThat(
>                     outputTopic.readKeyValuesToMap(),
>                     is(emptyMap())
>             );
>             assertThat(
>                     asMap(store),
>                     is(emptyMap())
>             );
>             left.pipeInput("lhs1", "lhsValue1|rhs1");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Change LHS foreign key reference
>             left.pipeInput("lhs1", "lhsValue1|rhs2");
>             {
>                 final Map<String, String> expected = mkMap(
>                         mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                 );
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(expected)
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(expected)
>                 );
>             }
>             // Populate RHS update on old LHS foreign key ref
>             right.pipeInput("rhs1", "rhsValue1Delta");
>             {
>                 assertThat(
>                         outputTopic.readKeyValuesToMap(),
>                         is(emptyMap())
>                 );
>                 assertThat(
>                         asMap(store),
>                         is(mkMap(
>                                 mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
>                         ))
>                 );
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to