[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17422961#comment-17422961 ]
Guozhang Wang commented on KAFKA-13164: --------------------------------------- Adding some thoughts here: 1. With topology optimization enabled, and without adding `Materialized` into `table#mapValues` operator, then it is indeed expected to have the state store only associated with the `KTABLE-SOURCE` not `KTABLE-MAPVALUES`. However, when working with the join, that state store should still work since the `ValueGetter` used to find the join match should be fetching from the source's state store and then apply the `mapValues` on the flight. So I think the issue is not really on attaching to the wrong node, but something else. 2. With `Materialized` in `table#mapValues`, then we would materialize both operators and should have two state stores, and the second store in `KTABLE-MAPVALUES` would be used to fetch for join matches. > State store is attached to wrong node in the Kafka Streams topology > ------------------------------------------------------------------- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) > Reporter: Ralph Matthias Debusmann > Assignee: Hao Li > Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table(<topic>).mapValues(<mapValues > function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, <leftJoin function>); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table(<topic>).mapValues(<mapValues > function>, *Materialized.as(<some name>)*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream(<topic>).mapValues(<mapValues > function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)