wcarlson5 commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1412698954


##########
docs/streams/upgrade-guide.html:
##########
@@ -198,6 +184,21 @@ <h3><a id="streams_api_changes_360" 
href="#streams_api_changes_360">Streams API
     </code>
     </pre>
     </p>
+

Review Comment:
   Thanks for remembering the docs update!



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -115,102 +116,93 @@ public void process(final Record<K, Change<V>> record) {
                 droppedRecordsSensor.record();
                 return;
             }
+            if (leftJoin) {
+                leftJoinInstructions(record);
+            } else {
+                defaultJoinInstructions(record);
+            }
+        }
 
-            final long[] currentHash = record.value().newValue == null ?
-                null :
-                Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-            final int partition = context().recordMetadata().get().partition();
+        private void leftJoinInstructions(final Record<K, Change<V>> record) {
             if (record.value().oldValue != null) {
                 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                }
+                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+            } else if (record.value().newValue != null) {
+                final KO newForeignKey =  
foreignKeyExtractor.apply(record.value().newValue);
+                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+            }
+        }
+
+        private void defaultJoinInstructions(final Record<K, Change<V>> 
record) {
+            if (record.value().oldValue != null) {
+                final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
                 if (oldForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
                     return;
                 }
                 if (record.value().newValue != null) {
-                    final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                    final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
                     if (newForeignKey == null) {
                         logSkippedRecordDueToNullForeignKey();
                         return;
                     }
-
-                    final byte[] serialOldForeignKey =
-                        foreignKeySerializer.serialize(foreignKeySerdeTopic, 
oldForeignKey);
-                    final byte[] serialNewForeignKey =
-                        foreignKeySerializer.serialize(foreignKeySerdeTopic, 
newForeignKey);
-                    if (!Arrays.equals(serialNewForeignKey, 
serialOldForeignKey)) {
+                    if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
                         //Different Foreign Key - delete the old key value and 
propagate the new one.
                         //Delete it from the oldKey's state store
-                        context().forward(
-                            record.withKey(oldForeignKey)
-                                .withValue(new SubscriptionWrapper<>(
-                                    currentHash,
-                                    DELETE_KEY_NO_PROPAGATE,
-                                    record.key(),
-                                    partition
-                                )));
-                        //Add to the newKey's state store. Additionally, 
propagate null if no FK is found there,
-                        //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-                        //and LEFT join.
+                        forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
                     }
-                    context().forward(
-                        record.withKey(newForeignKey)
-                            .withValue(new SubscriptionWrapper<>(
-                                currentHash,
-                                PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
-                                record.key(),
-                                partition
-                            )));
+                    //Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
+                    //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
+                    //and LEFT join.
+                    forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
                 } else {
-                    //A simple propagatable delete. Delete from the state 
store and propagate the delete onwards.
-                    context().forward(
-                        record.withKey(oldForeignKey)
-                           .withValue(new SubscriptionWrapper<>(
-                               currentHash,
-                               DELETE_KEY_AND_PROPAGATE,
-                               record.key(),
-                               partition
-                           )));
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
                 }
             } else if (record.value().newValue != null) {
-                //change.oldValue is null, which means it was deleted at least 
once before, or it is brand new.
-                //In either case, we only need to propagate if the FK_VAL is 
available, as the null from the delete would
-                //have been propagated otherwise.
-
-                final SubscriptionWrapper.Instruction instruction;
-                if (leftJoin) {
-                    //Want to send info even if RHS is null.
-                    instruction = PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
-                } else {
-                    instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
-                }
-                final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);

Review Comment:
   this seems redundant with line 165. That check the `newValue` is not `null`



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -115,102 +116,93 @@ public void process(final Record<K, Change<V>> record) {
                 droppedRecordsSensor.record();
                 return;
             }
+            if (leftJoin) {
+                leftJoinInstructions(record);
+            } else {
+                defaultJoinInstructions(record);
+            }
+        }
 
-            final long[] currentHash = record.value().newValue == null ?
-                null :
-                Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-            final int partition = context().recordMetadata().get().partition();
+        private void leftJoinInstructions(final Record<K, Change<V>> record) {
             if (record.value().oldValue != null) {
                 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                }
+                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+            } else if (record.value().newValue != null) {
+                final KO newForeignKey =  
foreignKeyExtractor.apply(record.value().newValue);
+                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+            }
+        }
+
+        private void defaultJoinInstructions(final Record<K, Change<V>> 
record) {
+            if (record.value().oldValue != null) {
+                final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
                 if (oldForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
                     return;
                 }
                 if (record.value().newValue != null) {
-                    final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                    final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
                     if (newForeignKey == null) {
                         logSkippedRecordDueToNullForeignKey();
                         return;
                     }
-
-                    final byte[] serialOldForeignKey =
-                        foreignKeySerializer.serialize(foreignKeySerdeTopic, 
oldForeignKey);
-                    final byte[] serialNewForeignKey =
-                        foreignKeySerializer.serialize(foreignKeySerdeTopic, 
newForeignKey);
-                    if (!Arrays.equals(serialNewForeignKey, 
serialOldForeignKey)) {
+                    if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
                         //Different Foreign Key - delete the old key value and 
propagate the new one.
                         //Delete it from the oldKey's state store
-                        context().forward(
-                            record.withKey(oldForeignKey)
-                                .withValue(new SubscriptionWrapper<>(
-                                    currentHash,
-                                    DELETE_KEY_NO_PROPAGATE,
-                                    record.key(),
-                                    partition
-                                )));
-                        //Add to the newKey's state store. Additionally, 
propagate null if no FK is found there,
-                        //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-                        //and LEFT join.
+                        forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
                     }
-                    context().forward(
-                        record.withKey(newForeignKey)
-                            .withValue(new SubscriptionWrapper<>(
-                                currentHash,
-                                PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
-                                record.key(),
-                                partition
-                            )));
+                    //Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
+                    //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
+                    //and LEFT join.
+                    forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
                 } else {
-                    //A simple propagatable delete. Delete from the state 
store and propagate the delete onwards.
-                    context().forward(
-                        record.withKey(oldForeignKey)
-                           .withValue(new SubscriptionWrapper<>(
-                               currentHash,
-                               DELETE_KEY_AND_PROPAGATE,
-                               record.key(),
-                               partition
-                           )));
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
                 }
             } else if (record.value().newValue != null) {
-                //change.oldValue is null, which means it was deleted at least 
once before, or it is brand new.
-                //In either case, we only need to propagate if the FK_VAL is 
available, as the null from the delete would
-                //have been propagated otherwise.
-
-                final SubscriptionWrapper.Instruction instruction;
-                if (leftJoin) {
-                    //Want to send info even if RHS is null.
-                    instruction = PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
-                } else {
-                    instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
-                }
-                final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
                 if (newForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
                 } else {
-                    context().forward(
-                        record.withKey(newForeignKey)
-                            .withValue(new SubscriptionWrapper<>(
-                                currentHash,
-                                instruction,
-                                record.key(),
-                                partition)));
+                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
                 }
             }
         }
 
+        private byte[] serialize(final KO key) {
+            return foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
+        }
+
+        private void forward(final Record<K, Change<V>> record, final KO 
foreignKey, final Instruction deleteKeyNoPropagate) {
+            final SubscriptionWrapper<K> wrapper = new SubscriptionWrapper<>(
+                hash(record),

Review Comment:
   How many times is this record rehased?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to