Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2024-05-04 Thread via GitHub


florin-akermann commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) {
 
 @Override
 public void process(final Record> record) {
+// clear cashed hash from previous record
+recordHash = null;
 // drop out-of-order records from versioned tables (cf. KIP-914)
 if (useVersionedSemantics && !record.value().isLatest) {
 LOG.info("Skipping out-of-order record from versioned table 
while performing table-table join.");
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);

Review Comment:
   @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in 
https://issues.apache.org/jira/browse/KAFKA-16394 already? Else i'll adress it.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2024-05-04 Thread via GitHub


florin-akermann commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) {
 
 @Override
 public void process(final Record> record) {
+// clear cashed hash from previous record
+recordHash = null;
 // drop out-of-order records from versioned tables (cf. KIP-914)
 if (useVersionedSemantics && !record.value().isLatest) {
 LOG.info("Skipping out-of-order record from versioned table 
while performing table-table join.");
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);

Review Comment:
   @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in 
https://issues.apache.org/jira/browse/KAFKA-16394 already? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2024-04-29 Thread via GitHub


mjsax commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1584066077


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) {
 
 @Override
 public void process(final Record> record) {
+// clear cashed hash from previous record
+recordHash = null;
 // drop out-of-order records from versioned tables (cf. KIP-914)
 if (useVersionedSemantics && !record.value().isLatest) {
 LOG.info("Skipping out-of-order record from versioned table 
while performing table-table join.");
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);

Review Comment:
   @florin-akermann @wcarlson5 -- Seems we introduces a bug here.
   
   Filed: https://issues.apache.org/jira/browse/KAFKA-16644 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-05 Thread via GitHub


wcarlson5 merged PR #14107:
URL: https://github.com/apache/kafka/pull/14107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-05 Thread via GitHub


wcarlson5 commented on PR #14107:
URL: https://github.com/apache/kafka/pull/14107#issuecomment-1841851229

   I'm not sure, either option for the optimization is fine with me as long as 
it's well documented. 
   
   I'm good with how the PR is for now. I'm going to merge it to get it in 
before feature freeze for 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-03 Thread via GitHub


florin-akermann commented on PR #14107:
URL: https://github.com/apache/kafka/pull/14107#issuecomment-1837513033

   > Just a couple of comments
   
   Thank you @wcarlson5 for the comments.
   I pushed according changes.
   
   While I have your attention could we revisit the adjutments done to the 
optimization 'drop null key records on repartitioning' as part of 
https://github.com/apache/kafka/pull/14174#discussion_r1351880231.
   To me it seems odd to apply the optimization in some cases and not in others.
   E.g. as described in the linked comment, there could be this confusing case 
where null-key records don't get propagated because of a explicit repartition 
call at the end of the topology.
   
   I was wondering whether it would make sense to adjust the optimization from 
'filter null-key records in repartition nodes if no left or outer-join is 
downstream' to 'filter null-key records in repartition nodes if no left or 
outer-join node is part of this branch of the topology (aka no left or 
outer-join is up- or downstream of this repartition node)'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-03 Thread via GitHub


florin-akermann commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1413117650


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -115,102 +116,93 @@ public void process(final Record> record) {
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+} else if (record.value().newValue != null) {
+final KO newForeignKey =  
foreignKeyExtractor.apply(record.value().newValue);
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+}
+}
+
+private void defaultJoinInstructions(final Record> 
record) {
+if (record.value().oldValue != null) {
+final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
 if (oldForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
 if (record.value().newValue != null) {
-final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
 if (newForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
-
-final byte[] serialOldForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
oldForeignKey);
-final byte[] serialNewForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
newForeignKey);
-if (!Arrays.equals(serialNewForeignKey, 
serialOldForeignKey)) {
+if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
 //Different Foreign Key - delete the old key value and 
propagate the new one.
 //Delete it from the oldKey's state store
-context().forward(
-record.withKey(oldForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-DELETE_KEY_NO_PROPAGATE,
-record.key(),
-partition
-)));
-//Add to the newKey's state store. Additionally, 
propagate null if no FK is found there,
-//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-//and LEFT join.
+forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
 }
-context().forward(
-record.withKey(newForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
-record.key(),
-partition
-)));
+//Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
+//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
+//and LEFT join.
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
 } else {
-//A simple propagatable delete. Delete from 

Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-03 Thread via GitHub


florin-akermann commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1413117384


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -115,102 +116,93 @@ public void process(final Record> record) {
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+} else if (record.value().newValue != null) {
+final KO newForeignKey =  
foreignKeyExtractor.apply(record.value().newValue);
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+}
+}
+
+private void defaultJoinInstructions(final Record> 
record) {
+if (record.value().oldValue != null) {
+final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
 if (oldForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
 if (record.value().newValue != null) {
-final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
 if (newForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
-
-final byte[] serialOldForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
oldForeignKey);
-final byte[] serialNewForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
newForeignKey);
-if (!Arrays.equals(serialNewForeignKey, 
serialOldForeignKey)) {
+if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
 //Different Foreign Key - delete the old key value and 
propagate the new one.
 //Delete it from the oldKey's state store
-context().forward(
-record.withKey(oldForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-DELETE_KEY_NO_PROPAGATE,
-record.key(),
-partition
-)));
-//Add to the newKey's state store. Additionally, 
propagate null if no FK is found there,
-//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-//and LEFT join.
+forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
 }
-context().forward(
-record.withKey(newForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
-record.key(),
-partition
-)));
+//Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
+//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
+//and LEFT join.
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
 } else {
-//A simple propagatable delete. Delete from 

Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-01 Thread via GitHub


wcarlson5 commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1412698954


##
docs/streams/upgrade-guide.html:
##
@@ -198,6 +184,21 @@ Streams API
 
 
 
+

Review Comment:
   Thanks for remembering the docs update!



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -115,102 +116,93 @@ public void process(final Record> record) {
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+} else if (record.value().newValue != null) {
+final KO newForeignKey =  
foreignKeyExtractor.apply(record.value().newValue);
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+}
+}
+
+private void defaultJoinInstructions(final Record> 
record) {
+if (record.value().oldValue != null) {
+final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
 if (oldForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
 if (record.value().newValue != null) {
-final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
 if (newForeignKey == null) {
 logSkippedRecordDueToNullForeignKey();
 return;
 }
-
-final byte[] serialOldForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
oldForeignKey);
-final byte[] serialNewForeignKey =
-foreignKeySerializer.serialize(foreignKeySerdeTopic, 
newForeignKey);
-if (!Arrays.equals(serialNewForeignKey, 
serialOldForeignKey)) {
+if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
 //Different Foreign Key - delete the old key value and 
propagate the new one.
 //Delete it from the oldKey's state store
-context().forward(
-record.withKey(oldForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-DELETE_KEY_NO_PROPAGATE,
-record.key(),
-partition
-)));
-//Add to the newKey's state store. Additionally, 
propagate null if no FK is found there,
-//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-//and LEFT join.
+forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
 }
-context().forward(
-record.withKey(newForeignKey)
-.withValue(new SubscriptionWrapper<>(
-currentHash,
-PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
-record.key(),
-partition
-)));
+//Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
+//since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
+//and LEFT join.
+

Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-11-29 Thread via GitHub


wcarlson5 commented on PR #14107:
URL: https://github.com/apache/kafka/pull/14107#issuecomment-1832612612

   I'm going to start my review of this today. Hopefully can be actionable 
sometime tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org