[ 
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17986251#comment-17986251
 ] 

Antoine Michaud commented on KAFKA-13386:
-----------------------------------------

As you proposed, it would be great to be able to change the hash generation 
from the full object to only the foreign key.

[I proposed recently in the community 
slack|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1750786531042709?thread_ts=1690375086.077949&cid=C48AHTCUQ]
 a solution by mapping the LHS values to only extract the foreign-key, then do 
the foreign-join to pull the foreign data, then join back to the LHS:
{code:java}
KTable<MainKey, MainData> mainTable = ...;
KTable<ForeignKey, ForeignData> foreignTable = ...;

// Before
mainTable.join(
    foreignTable,
    (mainData) -> mainData.foreignKey,
    (mainData, foreignData) -> applyJoin(mainData, foreignData)
);


// After
mainTable
  // Only select the foreign key, or the minimum possible data which must not 
evolve
  .mapValues((mainData) -> mainData.foreignKey)

  // Pull the foreign data with a foreign-join
  .join(
    foreignTable,
    Function.identity(), // LHS is already the FK
    (foreignKey, foreignData) -> foreignData // As LHS is the fk (previously 
mapped in mapValues), let's just return the foreign data (or a sub-part if 
needed to reduce the join-result store size)
  )

  // Join back the LHS data with an equi-join
  .join(mainTable, (foreignData, mainData) -> applyJoin(mainData, 
foreignData));{code}
You may need to provide your Serde if there is no default global serde:
{code:java}
mainTable
  // do not provide the store name to not create an additional store
  .mapValues((mainData) -> mainData.foreignKey, Materialized.with(null, new 
ForeignKeySerde()))
  .join(...);{code}
 

With the given solution:
 * The foreign-join result store contains less data as before
 * There is no additional store, as the second join is co-localizing the 
join-result table and the main table
 * You can evolve the main table schema safely (adding a new field, changing 
another field type, ...), except if you change the FK serde (or the sub-part 
extracted for the foreign-join).

Spoiler: I'm the co-writer of the ticket KAFKA-15303, and it's been months 
trying to find a clean solution without the need of an app reset.

> Foreign Key Join filtering out valid records after a code change / schema 
> evolved
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-13386
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13386
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.2
>            Reporter: Sergio Duran Vegas
>            Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant 
> across upgrades. So in case of changes this opimitzation will drop 
> invalid/intermediate records. In other situations we have relied on the same 
> property, for example when computing whether an update is a duplicate result 
> or not.
>  
> The problem is that some serializers are sadly not deterministic.
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>  
> {code:java}
> //If this value doesn't match the current value from the original table, it 
> is stale and should be discarded.
>  if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>  
> A solution for this problem would be that the comparison use foreign-key 
> reference itself instead of the whole message hash.
>  
> The bug fix proposal is to be allow the user to choose between one method of 
> comparison or another (whole hash or Fk reference). This would fix the 
> problem of dropping valid records on certain cases and allow the user to also 
> choose the current optimized way of checking valid records and intermediate 
> results dropping.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to