florin-akermann commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1352140220


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##########
@@ -51,8 +51,8 @@ public void init(final ProcessorContext<K, V> context) {
         public void process(final Record<K, V> record) {
             // if the key is null, we do not need to put the record into 
window store
             // since it will never be considered for join operations
+            context().forward(record);
             if (record.key() != null) {
-                context().forward(record);
                 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
                 window.put(record.key(), record.value(), record.timestamp());

Review Comment:
   Should a null key ever match? I thought/think in this context `null != null`?
   Plus, my guess would be that most `WindowStore` implementations throw upon 
`null` as a key?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -124,17 +124,20 @@ public void init(final ProcessorContext<K, VOut> context) 
{
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-                return;
-            }
-            boolean needOuterJoin = outer;
-
             final long inputRecordTimestamp = record.timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+            if (outer && record.key() == null && record.value() != null) {
+                context().forward(record.withValue(joiner.apply(record.key(), 
record.value(), null)));
+                return;
+            } else if (record.key() == null || record.value() == null) {

Review Comment:
   agree - adjusted



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##########
@@ -39,22 +39,26 @@ public static <KIn, VIn, KOut, VOut> boolean skipRecord(
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
         if (record.key() == null || record.value() == null) {
-            if (context.recordMetadata().isPresent()) {
-                final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-                logger.warn(
-                    "Skipping record due to null key or value. "
-                        + "topic=[{}] partition=[{}] offset=[{}]",
-                    recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-                );
-            } else {
-                logger.warn(
-                    "Skipping record due to null key or value. Topic, 
partition, and offset not known."
-                );
-            }
-            droppedRecordsSensor.record();
+            dropRecord(logger, droppedRecordsSensor, context);
             return true;
         } else {
             return false;
         }
     }
+
+    public static <KOut, VOut> void dropRecord(final Logger logger, final 
Sensor droppedRecordsSensor, final ProcessorContext<KOut, VOut> context) {

Review Comment:
   agree - adjusted



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -124,17 +124,20 @@ public void init(final ProcessorContext<K, VOut> context) 
{
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-                return;
-            }
-            boolean needOuterJoin = outer;
-
             final long inputRecordTimestamp = record.timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+            if (outer && record.key() == null && record.value() != null) {

Review Comment:
   I don't quite follow. For inner join we would like to keep the current 
behavior.
   Which column in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics 
does 'inner left' join refere to.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
             LOG.debug("Optimizing the Kafka Streams graph for self-joins");
             rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
         }
+        LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+        rewriteRepartitionNodes();
     }
 
+    private void rewriteRepartitionNodes() {

Review Comment:
   This is related to this change: 
https://github.com/apache/kafka/pull/14174/commits/d1301b1220a072af198539910b9d87e11706624a
   
   Previously,  as an optimization, null key records were filtered out upon 
repartitioning.
   Now we need null-key records to propagate if there is a left- or outer-join 
operator further downstream.
   @mjsax hinted in KAFKA-12317 that this optimization can no longer be applied 
everywhere.
   
   So now, by default, the optimization is no longer applied.
   `rewriteRepartitionNodes` adds this optimization if no left- or outer-join 
operation is downstream to the `partitionNode` in question.
   
   In general, a manual repartition is covered by this as well.
   Though, now I wonder whether there should be some kind of config/flag 
whether you want this optimization to be applied at all.
   Because, you could define some topology with left joins etc. expecting 
null-key records to propagate all the way through.
   However, if you chose to add a manual repartition at the very end of your 
topology they would then still be filtered out because no left- or outer-join 
operation is downstream of this last repartition.
   
   I guess developers would like to be able to opt out of this optimization in 
this case?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to