Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650219136


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1616,4 +1623,25 @@ public  KStream processValues(
 processNode,
 builder);
 }
+
+@Override
+public KStream markAsPartitioned() {
+final ProcessorParameters 
processorParameters =
+new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);

Review Comment:
   I will keep this open to remind myself to update the KIP after the review is 
complete. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650218909


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.

Review Comment:
   @mjsax 
   
   Sorry for the confusion, the javadoc here can be better written...Before I 
do that, the composite key notion came from the original discussion 
[here](https://lists.apache.org/thread/r7yqsoqsox0z2mzxt33r9r99tnwvb58o) - 
nonetheless I should remove it. Perhaps a brief description that `interactive 
query might fail when trying to guess the original key`.
   
   As for Joins, I might need your help. Most of my understanding of the 
problem came from the discussion thread.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650214064


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper selectKeyProcessorNode = 
internalSelectKey(mapper, new NamedInternal(named));
-selectKeyProcessorNode.keyChangingOperation(true);
+selectKeyProcessorNode.keyChangingOperation(repartitionRequired);
 
 builder.addGraphNode(graphNode, selectKeyProcessorNode);
 
 // key serde cannot be preserved
 return new KStreamImpl<>(
-selectKeyProcessorNode.nodeName(),
-null,
-valueSerde,
-subTopologySourceNodes,
-true,
-selectKeyProcessorNode,
-builder);
+selectKeyProcessorNode.nodeName(),

Review Comment:
   Thanks, reverted these indentations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650128875


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper

Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-04-30 Thread via GitHub


mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1585886754


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.

Review Comment:
   ```suggestion
* and does not require further repartitioning by downstream key 
depedent operations.
   ```



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper selectKeyProcessorNode = 
internalSelectKey(mapper, new NamedInternal(named));
-selectKeyProcessorNode.keyChangingOperation(true);
+selectKeyProcessorNode.keyChangingOperation(repartitionRequired);
 
 builder.addGraphNode(graphNode, selectKeyProcessorNode);
 
 // key serde cannot be preserved
 return new KStreamImpl<>(
-selectKeyProcessorNode.nodeName(),
-null,
-valueSerde,
-subTopologySourceNodes,
-true,
-selectKeyProcessorNode,
-builder);
+selectKeyProcessorNode.nodeName(),

Review Comment:
   nit: avoid unnecessary reformatting (ie, indention in this case) -- I assume 
you have some "auto format" feature enabled in your IDE. I would recommend to 
disable it, or adjust the setting to avoid noise like this.



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 
+ * This method will overwrite a default behavior as described below.
+ * By default, Kafka Streams always automatically repartition the records 
to prepare for a stateful operation,
+ * however, it is not always required when input stream is partitioned as 
intended. As an example,
+ * if an input stream is partitioned by a String key1, calling the below 
function will trigger a repartition:
+ * 
+ * {@code
+ * KStream inputStream = builder.stream("topic");
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ * 
+ * You can then overwrite the default behavior by calling this method:
+ * {@code
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .markAsPartitioned()
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ *  

Review Comment:
   Do we need this tag?



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.

Review Comment:
   Can you refresh my memory about joins? I cannot remember the details.
   
   We should add a section to the `docs/streams/developer-guide/dsl-api.html` 
and explain the "do" and "donts" of this operation.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 

[PR] KIP-759 Mark as Partitioned [kafka]

2024-04-16 Thread via GitHub


LQXshane opened a new pull request, #15740:
URL: https://github.com/apache/kafka/pull/15740

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org