Nikita-Shupletsov commented on code in PR #20605:
URL: https://github.com/apache/kafka/pull/20605#discussion_r2386698796


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -128,47 +128,105 @@ public void process(final Record<KLeft, Change<VLeft>> 
record) {
         }
 
         private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> 
record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            if (oldValue == null && newValue == null) {
+                // no output for idempotent left hand side deletes
+                return;
+            }
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final KRight newForeignKey = newValue == null ? null : 
foreignKeyExtractor.extract(record.key(), newValue);
+
+            final boolean maybeUnsubscribe = oldForeignKey != null;
+            if (maybeUnsubscribe) {
+                // delete old subscription only if FK changed
+                //
+                // if FK did change, we need to explicitly delete the old 
subscription,
+                // because the new subscription goes to a different partition
+                final boolean foreignKeyChanged = 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));
+
+                if (foreignKeyChanged) {
+                    // this may lead to unnecessary tombstones if the old FK 
did not join;
+                    // however, we cannot avoid it as we have no means to know 
if the old FK joined or not
                     forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-            } else if (record.value().newValue != null) {
-                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             }
+
+            // for all cases (insert, update, and delete), we send a new 
subscription;
+            // we need to get a response back for all cases to always produce 
a left-join result
+            //
+            // note: for delete, `newForeignKey` is null, what is a "hack"

Review Comment:
   a thought, if we split the logic into if equals null, delete(or pass null 
expricitly), if not do the normal stuff, will it look as hacky? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -128,47 +128,105 @@ public void process(final Record<KLeft, Change<VLeft>> 
record) {
         }
 
         private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> 
record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            if (oldValue == null && newValue == null) {
+                // no output for idempotent left hand side deletes
+                return;
+            }
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final KRight newForeignKey = newValue == null ? null : 
foreignKeyExtractor.extract(record.key(), newValue);
+
+            final boolean maybeUnsubscribe = oldForeignKey != null;
+            if (maybeUnsubscribe) {
+                // delete old subscription only if FK changed
+                //
+                // if FK did change, we need to explicitly delete the old 
subscription,
+                // because the new subscription goes to a different partition
+                final boolean foreignKeyChanged = 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));
+
+                if (foreignKeyChanged) {
+                    // this may lead to unnecessary tombstones if the old FK 
did not join;
+                    // however, we cannot avoid it as we have no means to know 
if the old FK joined or not
                     forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-            } else if (record.value().newValue != null) {
-                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             }
+
+            // for all cases (insert, update, and delete), we send a new 
subscription;
+            // we need to get a response back for all cases to always produce 
a left-join result
+            //
+            // note: for delete, `newForeignKey` is null, what is a "hack"
+            // no actual subscription will be added for null-FK on the right 
hand sice, but we still get the response back we need
+            //
+            // this may lead to unnecessary tombstones if the old FK did not 
join;
+            // however, we cannot avoid it as we have no means to know if the 
old FK joined or not
+            forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
         }
 
         private void defaultJoinInstructions(final Record<KLeft, 
Change<VLeft>> record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final boolean needToUnsubscribe = oldForeignKey != null;
 
-                if (oldForeignKey == null && newForeignKey == null) {
+            // if left row is inserted or updated, subscribe to new FK (if new 
FK is valid)
+            if (newValue != null) {
+                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), newValue);
+
+                if (newForeignKey == null) { // invalid FK
                     logSkippedRecordDueToNullForeignKey();
-                } else if (oldForeignKey == null) {
-                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
-                } else if (newForeignKey == null) {
-                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
-                } else if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
-                    //Different Foreign Key - delete the old key value and 
propagate the new one.
-                    //Delete it from the oldKey's state store
-                    forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
-                    //Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
-                    //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-                    //and LEFT join.
-                    forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-                } else { // unchanged FK
-                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+                    if (needToUnsubscribe) {
+                        // delete old subscription
+                        //
+                        // this may lead to unnecessary tombstones if the old 
FK did not join;
+                        // however, we cannot avoid it as we have no means to 
know if the old FK joined or not
+                        forward(record, oldForeignKey, 
DELETE_KEY_AND_PROPAGATE);
+                    }
+                } else { // valid FK

Review Comment:
   nit: could be else if



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java:
##########
@@ -82,7 +82,8 @@ public void init(final ProcessorContext<CombinedKey<KRight, 
KLeft>, Change<Value
 
             @Override
             public void process(final Record<KRight, 
SubscriptionWrapper<KLeft>> record) {
-                if (record.key() == null && 
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction()))
 {
+                final KRight foreignKey = record.key();

Review Comment:
   as a thought, what if we rename the `record` to something like `fkRecord` 
instead? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -128,47 +128,105 @@ public void process(final Record<KLeft, Change<VLeft>> 
record) {
         }
 
         private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> 
record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            if (oldValue == null && newValue == null) {
+                // no output for idempotent left hand side deletes
+                return;
+            }
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final KRight newForeignKey = newValue == null ? null : 
foreignKeyExtractor.extract(record.key(), newValue);
+
+            final boolean maybeUnsubscribe = oldForeignKey != null;
+            if (maybeUnsubscribe) {
+                // delete old subscription only if FK changed
+                //
+                // if FK did change, we need to explicitly delete the old 
subscription,
+                // because the new subscription goes to a different partition
+                final boolean foreignKeyChanged = 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));

Review Comment:
   we can check for null here to avoid the unnecessary serialization and 
comparison



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to