Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-07-10 Thread via GitHub


mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1671646367


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -796,7 +797,6 @@ public  KGroupedStream groupBy(final 
KeyValueMapper groupedInternal = new 
GroupedInternal<>(grouped);
 final ProcessorGraphNode selectKeyMapNode = 
internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
-selectKeyMapNode.keyChangingOperation(true);

Review Comment:
   Why do we remove this? `groupBy()` does set a new key (in contrast to 
`groupByKey()`) and thus it is a key-changing operation.
   
   If this change does not break any existing tests, I would be surprised (and 
we would have a test gap)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-07-10 Thread via GitHub


mjsax commented on PR #15740:
URL: https://github.com/apache/kafka/pull/15740#issuecomment-2219658049

   Btw: you should rebase your PR. The build failed with `Execution failed for 
task ':streams:spotlessJavaCheck'.` due to changes in `trunk` that you should 
pickup and fix your PR to make it pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-07-10 Thread via GitHub


mjsax commented on PR #15740:
URL: https://github.com/apache/kafka/pull/15740#issuecomment-2219637356

   One more thing: we should also update `docs/streams/upgrade.html` (just 
listing the KIP in the 3.9 upgrade section -- this section might not yet 
existing, so just add it for this case) and 
`docs/streams/devoloper-guide/dsl-api.html` (add the new operator to the table 
listing all `KStream` operators (cf 
https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html#transform-a-stream)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-07-10 Thread via GitHub


mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1671666407


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java:
##
@@ -439,6 +439,87 @@ public void 
shouldPerformSelectKeyWithRepartitionOperation(final String topology
 assertEquals(1, countOccurrencesInTopology(topology, "Sink: 
.*-repartition.*"));
 }
 
+@ParameterizedTest
+@ValueSource(strings = {StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION})
+public void 
shouldNotRepartitionWithMarkAsPartitionedFollowingSelectKey(final String 
topologyOptimization) throws Exception {
+final long timestamp = System.currentTimeMillis();
+
+sendEvents(
+timestamp,
+Arrays.asList(
+new KeyValue<>(1, "10"),
+new KeyValue<>(2, "20")
+)
+);
+
+final StreamsBuilder builder = new StreamsBuilder();
+
+builder.stream(inputTopic, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+.selectKey((key, value) -> Integer.valueOf(value))
+.markAsPartitioned(Named.as("partition-preserved"))
+.groupByKey()
+.count()
+.toStream()
+.to(outputTopic);
+
+
+startStreams(builder, createStreamsConfig(topologyOptimization));
+
+validateReceivedMessages(
+new IntegerDeserializer(),
+new LongDeserializer(),
+Arrays.asList(
+new KeyValue<>(10, 1L),
+new KeyValue<>(20, 1L)
+)
+);
+
+final String topology = builder.build().describe().toString();
+
+assertEquals(0, countOccurrencesInTopology(topology, "Sink: 
.*-repartition.*"));
+}
+
+@ParameterizedTest
+@ValueSource(strings = {StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION})
+public void shouldNotRepartitionWithMarkAsPartitionedFollowingMap(final 
String topologyOptimization) throws Exception {
+final String topicBMapperName = "topic-b-mapper";
+final long timestamp = System.currentTimeMillis();
+
+sendEvents(
+timestamp,
+Arrays.asList(
+new KeyValue<>(1, "10"),
+new KeyValue<>(2, "20")
+)
+);
+
+final StreamsBuilder builder = new StreamsBuilder();
+
+builder.stream(inputTopic, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+.map(KeyValue::new, Named.as(topicBMapperName))

Review Comment:
   This seems to be pretty much the same as the test above? Do we gain much 
adding it with regard to "useful" test coverage?
   
   It might be more interesting to add a "branching" case, as well as a 
"downstream" key-changing operation:
   ```
   // branching
   KStream s = builder.stream(...).map(...)
   
   s.markAsPartitioned(Named.as("partition-preserved"))
.groupByKey()
.count()
.toStream()
.to(outputTopic);
   
   // re-using `s` here, "branches / fans-out" the topology, and because we 
call `groupByKey()` directly on `s` we should get a repartition topic here (we 
don't see the `markAsPartitions()` operation from above, and it should not 
impact `s` itself.
   s.groupByKey()
.count()
.toStream()
.to(outputTopic2);


   // downstream map
   KStream s = builder.stream(...)
   .map(...)
   .markAsPartitioned(Named.as("partition-preserved"))
   .map(...) // insert a second map -- this should make the upstream 
`markAsPartitioned()` void and we should get a repartition topic again
   .groupByKey()
   .count()
   .toStream()
   .to(outputTopic);
   
   // maybe also: branch after `markAsPartitions`
   KStream s = builder.stream(...)
   .map(...)
   .markAsPartitioned(Named.as("partition-preserved"));
   
   // map() after "markAsPartitios in this branch -- should trigger 
repartitoning
   s.map(...)
.groupByKey()
.count()
.toStream()
.to(outputTopic);
   
   // no reparation topic in this branch
   s.groupByKey()
.count()
.toStream()
.to(outputTopic2);
   ```



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1624,4 +1624,28 @@ public  KStream processValues(
 processNode,
 builder);
 }
+
+@Override
+public KStream markAsPartitioned(final Named named) {
+
+final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, PARTITION_PRESERVE_NAME);
+
+final ProcessorParameters 
processorParameters =
+new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);
+
+final PartitionPreservingNode 
partitionPreservingNode = new PartitionPreservingNode<>(
+

Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650219136


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1616,4 +1623,25 @@ public  KStream processValues(
 processNode,
 builder);
 }
+
+@Override
+public KStream markAsPartitioned() {
+final ProcessorParameters 
processorParameters =
+new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);

Review Comment:
   I will keep this open to remind myself to update the KIP after the review is 
complete. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650218909


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.

Review Comment:
   @mjsax 
   
   Sorry for the confusion, the javadoc here can be better written...Before I 
do that, the composite key notion came from the original discussion 
[here](https://lists.apache.org/thread/r7yqsoqsox0z2mzxt33r9r99tnwvb58o) - 
nonetheless I should remove it. Perhaps a brief description that `interactive 
query might fail when trying to guess the original key`.
   
   As for Joins, I might need your help. Most of my understanding of the 
problem came from the discussion thread.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650214064


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper selectKeyProcessorNode = 
internalSelectKey(mapper, new NamedInternal(named));
-selectKeyProcessorNode.keyChangingOperation(true);
+selectKeyProcessorNode.keyChangingOperation(repartitionRequired);
 
 builder.addGraphNode(graphNode, selectKeyProcessorNode);
 
 // key serde cannot be preserved
 return new KStreamImpl<>(
-selectKeyProcessorNode.nodeName(),
-null,
-valueSerde,
-subTopologySourceNodes,
-true,
-selectKeyProcessorNode,
-builder);
+selectKeyProcessorNode.nodeName(),

Review Comment:
   Thanks, reverted these indentations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650128875


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper

Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-04-30 Thread via GitHub


mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1585886754


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.

Review Comment:
   ```suggestion
* and does not require further repartitioning by downstream key 
depedent operations.
   ```



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper selectKeyProcessorNode = 
internalSelectKey(mapper, new NamedInternal(named));
-selectKeyProcessorNode.keyChangingOperation(true);
+selectKeyProcessorNode.keyChangingOperation(repartitionRequired);
 
 builder.addGraphNode(graphNode, selectKeyProcessorNode);
 
 // key serde cannot be preserved
 return new KStreamImpl<>(
-selectKeyProcessorNode.nodeName(),
-null,
-valueSerde,
-subTopologySourceNodes,
-true,
-selectKeyProcessorNode,
-builder);
+selectKeyProcessorNode.nodeName(),

Review Comment:
   nit: avoid unnecessary reformatting (ie, indention in this case) -- I assume 
you have some "auto format" feature enabled in your IDE. I would recommend to 
disable it, or adjust the setting to avoid noise like this.



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 
+ * This method will overwrite a default behavior as described below.
+ * By default, Kafka Streams always automatically repartition the records 
to prepare for a stateful operation,
+ * however, it is not always required when input stream is partitioned as 
intended. As an example,
+ * if an input stream is partitioned by a String key1, calling the below 
function will trigger a repartition:
+ * 
+ * {@code
+ * KStream inputStream = builder.stream("topic");
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ * 
+ * You can then overwrite the default behavior by calling this method:
+ * {@code
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .markAsPartitioned()
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ *  

Review Comment:
   Do we need this tag?



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.

Review Comment:
   Can you refresh my memory about joins? I cannot remember the details.
   
   We should add a section to the `docs/streams/developer-guide/dsl-api.html` 
and explain the "do" and "donts" of this operation.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 

[PR] KIP-759 Mark as Partitioned [kafka]

2024-04-16 Thread via GitHub


LQXshane opened a new pull request, #15740:
URL: https://github.com/apache/kafka/pull/15740

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org