mjsax commented on code in PR #20605:
URL: https://github.com/apache/kafka/pull/20605#discussion_r2388366045


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -128,47 +128,105 @@ public void process(final Record<KLeft, Change<VLeft>> 
record) {
         }
 
         private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> 
record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            if (oldValue == null && newValue == null) {
+                // no output for idempotent left hand side deletes
+                return;
+            }
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final KRight newForeignKey = newValue == null ? null : 
foreignKeyExtractor.extract(record.key(), newValue);
+
+            final boolean maybeUnsubscribe = oldForeignKey != null;
+            if (maybeUnsubscribe) {
+                // delete old subscription only if FK changed
+                //
+                // if FK did change, we need to explicitly delete the old 
subscription,
+                // because the new subscription goes to a different partition
+                final boolean foreignKeyChanged = 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));
+
+                if (foreignKeyChanged) {
+                    // this may lead to unnecessary tombstones if the old FK 
did not join;
+                    // however, we cannot avoid it as we have no means to know 
if the old FK joined or not
                     forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-            } else if (record.value().newValue != null) {
-                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             }
+
+            // for all cases (insert, update, and delete), we send a new 
subscription;
+            // we need to get a response back for all cases to always produce 
a left-join result
+            //
+            // note: for delete, `newForeignKey` is null, what is a "hack"

Review Comment:
   IMHO it's still hacky, as we keep sending a `null` to the right hand side. 
The hacky part is not about the variable being `null` (it contributes a little 
bit I guess, as it might seem unintuitive that it is a valid case), but about 
sending `null` at all (and sending `null` is also unintuitive by itself IMHO). 
Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to