mjsax commented on code in PR #20605:
URL: https://github.com/apache/kafka/pull/20605#discussion_r2388356518
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -128,47 +128,105 @@ public void process(final Record<KLeft, Change<VLeft>>
record) {
}
private void leftJoinInstructions(final Record<KLeft, Change<VLeft>>
record) {
- if (record.value().oldValue != null) {
- final KRight oldForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
- final KRight newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
- if (oldForeignKey != null &&
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+ final VLeft oldValue = record.value().oldValue;
+ final VLeft newValue = record.value().newValue;
+
+ if (oldValue == null && newValue == null) {
+ // no output for idempotent left hand side deletes
+ return;
+ }
+
+ final KRight oldForeignKey = oldValue == null ? null :
foreignKeyExtractor.extract(record.key(), oldValue);
+ final KRight newForeignKey = newValue == null ? null :
foreignKeyExtractor.extract(record.key(), newValue);
+
+ final boolean maybeUnsubscribe = oldForeignKey != null;
+ if (maybeUnsubscribe) {
+ // delete old subscription only if FK changed
+ //
+ // if FK did change, we need to explicitly delete the old
subscription,
+ // because the new subscription goes to a different partition
+ final boolean foreignKeyChanged =
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));
Review Comment:
Don't think it would buy us much? Just a few method calls, as `serialize()`
would do the null checks and returns `null`, and if either `serialize` call
does return null, `equals` is also very cheap.
I would rather keep it the way it is to keep fewer if/else checks what makes
it easier to read, and I don't think we get any perf benefits if we make the
code more complicated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]