Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
florin-akermann commented on PR #14174: URL: https://github.com/apache/kafka/pull/14174#issuecomment-1793747649 @wcarlson5 thanks for the merge. I think it would be good to tackle https://github.com/apache/kafka/pull/14107 in the same release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
wcarlson5 merged PR #14174: URL: https://github.com/apache/kafka/pull/14174 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
florin-akermann commented on code in PR #14174: URL: https://github.com/apache/kafka/pull/14174#discussion_r1372233874 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) { LOG.debug("Optimizing the Kafka Streams graph for self-joins"); rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>()); } +LOG.debug("Optimizing the Kafka Streams graph for null-key records"); +rewriteRepartitionNodes(); } +private void rewriteRepartitionNodes() { Review Comment: Ok, so basically we remove this optimization completley for now? Developers could still just filter out null keys with a 'filter' operator to achieve the old behavior. And then we make a separate ticket where developers can opt in to this optimization? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
wcarlson5 commented on code in PR #14174: URL: https://github.com/apache/kafka/pull/14174#discussion_r1366083940 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java: ## @@ -51,8 +51,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { // if the key is null, we do not need to put the record into window store // since it will never be considered for join operations +context().forward(record); if (record.key() != null) { -context().forward(record); // Every record basically starts a new window. We're using a window store mostly for the retention. window.put(record.key(), record.value(), record.timestamp()); Review Comment: That's likely true about the windowstore implemntations. Something to think about for later then. We should maybe clarify the semantics about null matching in the docs somewhere. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) { LOG.debug("Optimizing the Kafka Streams graph for self-joins"); rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>()); } +LOG.debug("Optimizing the Kafka Streams graph for null-key records"); +rewriteRepartitionNodes(); } +private void rewriteRepartitionNodes() { Review Comment: Hmm, I would think they would like to opt out. That would require an update to the kip. Maybe even a revote. I'm not sure what the odds are that someone manually repartitioning would be needing the null-keys to propagate. But its probably higher than you would think as manual repartitioner as are typically power-users. I don't think we need to make it optional as we already filter all null keys and now we let some propagate. Maybe we should just make a ticket and we can come back to it. Being able to toggle the optimization should be pretty simple. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { Review Comment: ah nvn, ignore this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
florin-akermann commented on code in PR #14174: URL: https://github.com/apache/kafka/pull/14174#discussion_r1352231502 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { Review Comment: I don't quite follow. For inner join we would like to keep the current behavior. Which column in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics does 'inner left' join refere to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
florin-akermann commented on code in PR #14174: URL: https://github.com/apache/kafka/pull/14174#discussion_r1352140220 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java: ## @@ -51,8 +51,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { // if the key is null, we do not need to put the record into window store // since it will never be considered for join operations +context().forward(record); if (record.key() != null) { -context().forward(record); // Every record basically starts a new window. We're using a window store mostly for the retention. window.put(record.key(), record.value(), record.timestamp()); Review Comment: Should a null key ever match? I thought/think in this context `null != null`? Plus, my guess would be that most `WindowStore` implementations throw upon `null` as a key? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { +context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); +return; +} else if (record.key() == null || record.value() == null) { Review Comment: agree - adjusted ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java: ## @@ -39,22 +39,26 @@ public static boolean skipRecord( // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored if (record.key() == null || record.value() == null) { -if (context.recordMetadata().isPresent()) { -final RecordMetadata recordMetadata = context.recordMetadata().get(); -logger.warn( -"Skipping record due to null key or value. " -+ "topic=[{}] partition=[{}] offset=[{}]", -recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() -); -} else { -logger.warn( -"Skipping record due to null key or value. Topic, partition, and offset not known." -); -} -droppedRecordsSensor.record(); +dropRecord(logger, droppedRecordsSensor, context); return true; } else { return false; } } + +public static void dropRecord(final Logger logger, final Sensor droppedRecordsSensor, final ProcessorContext context) { Review Comment: agree - adjusted ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { Review Comment: I don't quite follow. For inner join we would like to keep the current behavior. Which column in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics does 'inner left' join refere to. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) { LOG.debug("Optimizing the Kafka Streams graph for self-joins"); rewriteSingleStoreSelfJoin(root,
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
wcarlson5 commented on code in PR #14174: URL: https://github.com/apache/kafka/pull/14174#discussion_r1347987570 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java: ## @@ -51,8 +51,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { // if the key is null, we do not need to put the record into window store // since it will never be considered for join operations +context().forward(record); if (record.key() != null) { -context().forward(record); // Every record basically starts a new window. We're using a window store mostly for the retention. window.put(record.key(), record.value(), record.timestamp()); Review Comment: why should null keys not enter the window? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java: ## @@ -39,22 +39,26 @@ public static boolean skipRecord( // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored if (record.key() == null || record.value() == null) { -if (context.recordMetadata().isPresent()) { -final RecordMetadata recordMetadata = context.recordMetadata().get(); -logger.warn( -"Skipping record due to null key or value. " -+ "topic=[{}] partition=[{}] offset=[{}]", -recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() -); -} else { -logger.warn( -"Skipping record due to null key or value. Topic, partition, and offset not known." -); -} -droppedRecordsSensor.record(); +dropRecord(logger, droppedRecordsSensor, context); return true; } else { return false; } } + +public static void dropRecord(final Logger logger, final Sensor droppedRecordsSensor, final ProcessorContext context) { Review Comment: I'm not a huge fan of splitting this out to a separate public method. I think you can just reuse the logic in skip record. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { Review Comment: what about inner left joins? Those values go into the window? Why? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) { LOG.debug("Optimizing the Kafka Streams graph for self-joins"); rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>()); } +LOG.debug("Optimizing the Kafka Streams graph for null-key records"); +rewriteRepartitionNodes(); } +private void rewriteRepartitionNodes() { Review Comment: This is to prevent null keys to go into reparation topics, right? Will that effect results if a manual reparation is added? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { +
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
florin-akermann commented on PR #14174: URL: https://github.com/apache/kafka/pull/14174#issuecomment-1749481836 Hey @wcarlson5 Great! Yes I am willing to push it. Looking forward to your feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
wcarlson5 commented on PR #14174: URL: https://github.com/apache/kafka/pull/14174#issuecomment-1749460142 Hey @florin-akermann I can take a look at this soon. I'll probably be able to take a first look next week if you are still willing to push this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org