[ https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17986251#comment-17986251 ]
Antoine Michaud commented on KAFKA-13386: ----------------------------------------- As you proposed, it would be great to be able to change the hash generation from the full object to only the foreign key. [I proposed recently in the community slack|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1750786531042709?thread_ts=1690375086.077949&cid=C48AHTCUQ] a solution by mapping the LHS values to only extract the foreign-key, then do the foreign-join to pull the foreign data, then join back to the LHS: {code:java} KTable<MainKey, MainData> mainTable = ...; KTable<ForeignKey, ForeignData> foreignTable = ...; // Before mainTable.join( foreignTable, (mainData) -> mainData.foreignKey, (mainData, foreignData) -> applyJoin(mainData, foreignData) ); // After mainTable // Only select the foreign key, or the minimum possible data which must not evolve .mapValues((mainData) -> mainData.foreignKey) // Pull the foreign data with a foreign-join .join( foreignTable, Function.identity(), // LHS is already the FK (foreignKey, foreignData) -> foreignData // As LHS is the fk (previously mapped in mapValues), let's just return the foreign data (or a sub-part if needed to reduce the join-result store size) ) // Join back the LHS data with an equi-join .join(mainTable, (foreignData, mainData) -> applyJoin(mainData, foreignData));{code} You may need to provide your Serde if there is no default global serde: {code:java} mainTable // do not provide the store name to not create an additional store .mapValues((mainData) -> mainData.foreignKey, Materialized.with(null, new ForeignKeySerde())) .join(...);{code} With the given solution: * The foreign-join result store contains less data as before * There is no additional store, as the second join is co-localizing the join-result table and the main table * You can evolve the main table schema safely (adding a new field, changing another field type, ...), except if you change the FK serde (or the sub-part extracted for the foreign-join). Spoiler: I'm the co-writer of the ticket KAFKA-15303, and it's been months trying to find a clean solution without the need of an app reset. > Foreign Key Join filtering out valid records after a code change / schema > evolved > --------------------------------------------------------------------------------- > > Key: KAFKA-13386 > URL: https://issues.apache.org/jira/browse/KAFKA-13386 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.2 > Reporter: Sergio Duran Vegas > Priority: Major > > The join optimization assumes the serializer is deterministic and invariant > across upgrades. So in case of changes this opimitzation will drop > invalid/intermediate records. In other situations we have relied on the same > property, for example when computing whether an update is a duplicate result > or not. > > The problem is that some serializers are sadly not deterministic. > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java] > > {code:java} > //If this value doesn't match the current value from the original table, it > is stale and should be discarded. > if (java.util.Arrays.equals(messageHash, currentHash)) {{code} > > A solution for this problem would be that the comparison use foreign-key > reference itself instead of the whole message hash. > > The bug fix proposal is to be allow the user to choose between one method of > comparison or another (whole hash or Fk reference). This would fix the > problem of dropping valid records on certain cases and allow the user to also > choose the current optimized way of checking valid records and intermediate > results dropping. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)