[ 
https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17424296#comment-17424296
 ] 

Hao Li commented on KAFKA-13164:
--------------------------------

Hi Ralph,

 

I was able to test it out with following code snippet:

 
{code:java}
static void createWordCountStream(final StreamsBuilder builder) {
 final KStream<String, String> textLines = builder.stream(joinInputTopic);
 final KTable<String, String> table = builder.<String, String>table(inputTopic)
 .mapValues(v -> {
 System.out.println("Table got " + v);
 return v + " suffix";
 });
final KStream<String, String> output = textLines.leftJoin(table, (sv, tv) -> {
 System.out.println("Join function called for " + sv + ", " + tv);
 return sv + tv;
 });
output.to(outputTopic);
 }
{code}
 

I was able to see the join function got called with correct sv and tv. I also 
saw table map values function called before join function called. So as 
[~guozhang] mentioned, the state store is attached to source table and the map 
value function is called on the fly and then join happens. So that's expected.

 

However, I wasn't able to see output in output topic. That requires more 
digging but state store attaching to source table seems to be expected.

> State store is attached to wrong node in the Kafka Streams topology
> -------------------------------------------------------------------
>
>                 Key: KAFKA-13164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13164
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>         Environment: local development (MacOS Big Sur 11.4)
>            Reporter: Ralph Matthias Debusmann
>            Assignee: Hao Li
>            Priority: Major
>             Fix For: 3.0.1
>
>         Attachments: 1.jpg, 3.jpg
>
>
> Hi,
> mjsax and me noticed a bug where a state store is attached to the wrong node 
> in the Kafka Streams topology.
> The issue arised when I tried to read a topic into a KTable, then continued 
> with a mapValues(), and then joined this KTable with a KStream, like so:
>  
> var kTable = this.streamsBuilder.table(<topic>).mapValues(<mapValues 
> function>);
>  
> and then later:
>  
> var joinedKStream = kstream.leftJoin(kTable, <leftJoin function>);
>  
> The join didn't work, and neither did it work when I added Materialized.as() 
> to mapValues(), like so:
> var kTable = this.streamsBuilder.table(<topic>).mapValues(<mapValues 
> function>, *Materialized.as(<some name>)*);
>  
>  Interestingly, I could get the join to work, when I first read the topic 
> into a *KStream*, then continued with the mapValues(), then turned the 
> KStream into a KTable, and then joined the KTable with the other KStream, 
> like so:
>  
> var kTable = this.streamsBuilder.stream(<topic>).mapValues(<mapValues 
> function>).toTable();
>  
> (the join worked the same as above)
>  
> When mjsax and me had a look on the topology, we could see that in the 
> former, not working code, the state store (required for the join) is attached 
> to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node 
> (see attachment "1.jpg"). In the working code, the state store is (correctly) 
> attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg").
>  
> Best regards,
> xdgrulez
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to