Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
florin-akermann commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) { @Override public void process(final Record> record) { +// clear cashed hash from previous record +recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) if (useVersionedSemantics && !record.value().isLatest) { LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); Review Comment: @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in https://issues.apache.org/jira/browse/KAFKA-16394 already? Else i'll adress it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
florin-akermann commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) { @Override public void process(final Record> record) { +// clear cashed hash from previous record +recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) if (useVersionedSemantics && !record.value().isLatest) { LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); Review Comment: @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in https://issues.apache.org/jira/browse/KAFKA-16394 already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
mjsax commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1584066077 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) { @Override public void process(final Record> record) { +// clear cashed hash from previous record +recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) if (useVersionedSemantics && !record.value().isLatest) { LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); Review Comment: @florin-akermann @wcarlson5 -- Seems we introduces a bug here. Filed: https://issues.apache.org/jira/browse/KAFKA-16644 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 merged PR #14107: URL: https://github.com/apache/kafka/pull/14107 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 commented on PR #14107: URL: https://github.com/apache/kafka/pull/14107#issuecomment-1841851229 I'm not sure, either option for the optimization is fine with me as long as it's well documented. I'm good with how the PR is for now. I'm going to merge it to get it in before feature freeze for 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
florin-akermann commented on PR #14107: URL: https://github.com/apache/kafka/pull/14107#issuecomment-1837513033 > Just a couple of comments Thank you @wcarlson5 for the comments. I pushed according changes. While I have your attention could we revisit the adjutments done to the optimization 'drop null key records on repartitioning' as part of https://github.com/apache/kafka/pull/14174#discussion_r1351880231. To me it seems odd to apply the optimization in some cases and not in others. E.g. as described in the linked comment, there could be this confusing case where null-key records don't get propagated because of a explicit repartition call at the end of the topology. I was wondering whether it would make sense to adjust the optimization from 'filter null-key records in repartition nodes if no left or outer-join is downstream' to 'filter null-key records in repartition nodes if no left or outer-join node is part of this branch of the topology (aka no left or outer-join is up- or downstream of this repartition node)' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
florin-akermann commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1413117650 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -115,102 +116,93 @@ public void process(final Record> record) { droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} else if (record.value().newValue != null) { +final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} +} + +private void defaultJoinInstructions(final Record> record) { +if (record.value().oldValue != null) { +final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.apply(record.value().oldValue); if (oldForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } if (record.value().newValue != null) { -final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } - -final byte[] serialOldForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, oldForeignKey); -final byte[] serialNewForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, newForeignKey); -if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) { +if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { //Different Foreign Key - delete the old key value and propagate the new one. //Delete it from the oldKey's state store -context().forward( -record.withKey(oldForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -DELETE_KEY_NO_PROPAGATE, -record.key(), -partition -))); -//Add to the newKey's state store. Additionally, propagate null if no FK is found there, -//since we must "unset" any output set by the previous FK-join. This is true for both INNER -//and LEFT join. +forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); } -context().forward( -record.withKey(newForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, -record.key(), -partition -))); +//Add to the newKey's state store. Additionally, propagate null if no FK is found there, +//since we must "unset" any output set by the previous FK-join. This is true for both INNER +//and LEFT join. +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } else { -//A simple propagatable delete. Delete from
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
florin-akermann commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1413117384 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -115,102 +116,93 @@ public void process(final Record> record) { droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} else if (record.value().newValue != null) { +final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} +} + +private void defaultJoinInstructions(final Record> record) { +if (record.value().oldValue != null) { +final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.apply(record.value().oldValue); if (oldForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } if (record.value().newValue != null) { -final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } - -final byte[] serialOldForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, oldForeignKey); -final byte[] serialNewForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, newForeignKey); -if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) { +if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { //Different Foreign Key - delete the old key value and propagate the new one. //Delete it from the oldKey's state store -context().forward( -record.withKey(oldForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -DELETE_KEY_NO_PROPAGATE, -record.key(), -partition -))); -//Add to the newKey's state store. Additionally, propagate null if no FK is found there, -//since we must "unset" any output set by the previous FK-join. This is true for both INNER -//and LEFT join. +forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); } -context().forward( -record.withKey(newForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, -record.key(), -partition -))); +//Add to the newKey's state store. Additionally, propagate null if no FK is found there, +//since we must "unset" any output set by the previous FK-join. This is true for both INNER +//and LEFT join. +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } else { -//A simple propagatable delete. Delete from
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1412698954 ## docs/streams/upgrade-guide.html: ## @@ -198,6 +184,21 @@ Streams API + Review Comment: Thanks for remembering the docs update! ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -115,102 +116,93 @@ public void process(final Record> record) { droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} else if (record.value().newValue != null) { +final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} +} + +private void defaultJoinInstructions(final Record> record) { +if (record.value().oldValue != null) { +final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.apply(record.value().oldValue); if (oldForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } if (record.value().newValue != null) { -final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } - -final byte[] serialOldForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, oldForeignKey); -final byte[] serialNewForeignKey = -foreignKeySerializer.serialize(foreignKeySerdeTopic, newForeignKey); -if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) { +if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { //Different Foreign Key - delete the old key value and propagate the new one. //Delete it from the oldKey's state store -context().forward( -record.withKey(oldForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -DELETE_KEY_NO_PROPAGATE, -record.key(), -partition -))); -//Add to the newKey's state store. Additionally, propagate null if no FK is found there, -//since we must "unset" any output set by the previous FK-join. This is true for both INNER -//and LEFT join. +forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); } -context().forward( -record.withKey(newForeignKey) -.withValue(new SubscriptionWrapper<>( -currentHash, -PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, -record.key(), -partition -))); +//Add to the newKey's state store. Additionally, propagate null if no FK is found there, +//since we must "unset" any output set by the previous FK-join. This is true for both INNER +//and LEFT join. +
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 commented on PR #14107: URL: https://github.com/apache/kafka/pull/14107#issuecomment-1832612612 I'm going to start my review of this today. Hopefully can be actionable sometime tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org