Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-11-05 Thread via GitHub


florin-akermann commented on PR #14174:
URL: https://github.com/apache/kafka/pull/14174#issuecomment-1793747649

   @wcarlson5 thanks for the merge. I think it would be good to tackle 
https://github.com/apache/kafka/pull/14107 in the same release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-31 Thread via GitHub


wcarlson5 merged PR #14174:
URL: https://github.com/apache/kafka/pull/14174


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-25 Thread via GitHub


florin-akermann commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1372233874


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
 LOG.debug("Optimizing the Kafka Streams graph for self-joins");
 rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
 }
+LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+rewriteRepartitionNodes();
 }
 
+private void rewriteRepartitionNodes() {

Review Comment:
   Ok, so basically we remove this optimization completley for now?
   Developers could still just filter out null keys with a 'filter' operator to 
achieve the old behavior.
   And then we make a separate ticket where developers can opt in to this 
optimization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-19 Thread via GitHub


wcarlson5 commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1366083940


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##
@@ -51,8 +51,8 @@ public void init(final ProcessorContext context) {
 public void process(final Record record) {
 // if the key is null, we do not need to put the record into 
window store
 // since it will never be considered for join operations
+context().forward(record);
 if (record.key() != null) {
-context().forward(record);
 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
 window.put(record.key(), record.value(), record.timestamp());

Review Comment:
   That's likely true about the windowstore implemntations. Something to think 
about for later then. We should maybe clarify the semantics about null matching 
in the docs somewhere.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
 LOG.debug("Optimizing the Kafka Streams graph for self-joins");
 rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
 }
+LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+rewriteRepartitionNodes();
 }
 
+private void rewriteRepartitionNodes() {

Review Comment:
   Hmm, I would think they would like to opt out. That would require an update 
to the kip. Maybe even a revote. I'm not sure what the odds are that someone 
manually repartitioning would be needing the null-keys to propagate. But its 
probably higher than you would think as manual repartitioner as are typically 
power-users.
   
   I don't think we need to make it optional as we already filter all null  
keys and now we let some propagate. Maybe we should just make a ticket and we 
can come back to it. Being able to toggle the optimization should be pretty 
simple.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {

Review Comment:
   ah nvn, ignore this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-10 Thread via GitHub


florin-akermann commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1352231502


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {

Review Comment:
   I don't quite follow. For inner join we would like to keep the current 
behavior.
   Which column in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics 
does 'inner left' join refere to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-10 Thread via GitHub


florin-akermann commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1352140220


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##
@@ -51,8 +51,8 @@ public void init(final ProcessorContext context) {
 public void process(final Record record) {
 // if the key is null, we do not need to put the record into 
window store
 // since it will never be considered for join operations
+context().forward(record);
 if (record.key() != null) {
-context().forward(record);
 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
 window.put(record.key(), record.value(), record.timestamp());

Review Comment:
   Should a null key ever match? I thought/think in this context `null != null`?
   Plus, my guess would be that most `WindowStore` implementations throw upon 
`null` as a key?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {
+context().forward(record.withValue(joiner.apply(record.key(), 
record.value(), null)));
+return;
+} else if (record.key() == null || record.value() == null) {

Review Comment:
   agree - adjusted



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##
@@ -39,22 +39,26 @@ public static  boolean skipRecord(
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
 if (record.key() == null || record.value() == null) {
-if (context.recordMetadata().isPresent()) {
-final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-logger.warn(
-"Skipping record due to null key or value. "
-+ "topic=[{}] partition=[{}] offset=[{}]",
-recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-);
-} else {
-logger.warn(
-"Skipping record due to null key or value. Topic, 
partition, and offset not known."
-);
-}
-droppedRecordsSensor.record();
+dropRecord(logger, droppedRecordsSensor, context);
 return true;
 } else {
 return false;
 }
 }
+
+public static  void dropRecord(final Logger logger, final 
Sensor droppedRecordsSensor, final ProcessorContext context) {

Review Comment:
   agree - adjusted



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {

Review Comment:
   I don't quite follow. For inner join we would like to keep the current 
behavior.
   Which column in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics 
does 'inner left' join refere to.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
 LOG.debug("Optimizing the Kafka Streams graph for self-joins");
 rewriteSingleStoreSelfJoin(root, 

Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-09 Thread via GitHub


wcarlson5 commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1347987570


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##
@@ -51,8 +51,8 @@ public void init(final ProcessorContext context) {
 public void process(final Record record) {
 // if the key is null, we do not need to put the record into 
window store
 // since it will never be considered for join operations
+context().forward(record);
 if (record.key() != null) {
-context().forward(record);
 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
 window.put(record.key(), record.value(), record.timestamp());

Review Comment:
   why should null keys not enter the window?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##
@@ -39,22 +39,26 @@ public static  boolean skipRecord(
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
 if (record.key() == null || record.value() == null) {
-if (context.recordMetadata().isPresent()) {
-final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-logger.warn(
-"Skipping record due to null key or value. "
-+ "topic=[{}] partition=[{}] offset=[{}]",
-recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-);
-} else {
-logger.warn(
-"Skipping record due to null key or value. Topic, 
partition, and offset not known."
-);
-}
-droppedRecordsSensor.record();
+dropRecord(logger, droppedRecordsSensor, context);
 return true;
 } else {
 return false;
 }
 }
+
+public static  void dropRecord(final Logger logger, final 
Sensor droppedRecordsSensor, final ProcessorContext context) {

Review Comment:
   I'm not a huge fan of splitting this out to a separate public method. I 
think you can just reuse the logic in skip record.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {

Review Comment:
   what about inner left joins? Those values go into the window? Why?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
 LOG.debug("Optimizing the Kafka Streams graph for self-joins");
 rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
 }
+LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+rewriteRepartitionNodes();
 }
 
+private void rewriteRepartitionNodes() {

Review Comment:
   This is to prevent null keys to go into reparation topics, right? Will that 
effect results if a manual reparation is added?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {
+

Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-05 Thread via GitHub


florin-akermann commented on PR #14174:
URL: https://github.com/apache/kafka/pull/14174#issuecomment-1749481836

   Hey @wcarlson5 
   Great! Yes I am willing to push it.
   Looking forward to your feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-05 Thread via GitHub


wcarlson5 commented on PR #14174:
URL: https://github.com/apache/kafka/pull/14174#issuecomment-1749460142

   Hey @florin-akermann I can take a look at this soon. I'll probably be able 
to take a first look next week if you are still willing to push this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org