Re: [PR] KIP-759 Mark as Partitioned [kafka]
mjsax commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1671646367 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -796,7 +797,6 @@ public KGroupedStream groupBy(final KeyValueMapper groupedInternal = new GroupedInternal<>(grouped); final ProcessorGraphNode selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name())); -selectKeyMapNode.keyChangingOperation(true); Review Comment: Why do we remove this? `groupBy()` does set a new key (in contrast to `groupByKey()`) and thus it is a key-changing operation. If this change does not break any existing tests, I would be surprised (and we would have a test gap)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
mjsax commented on PR #15740: URL: https://github.com/apache/kafka/pull/15740#issuecomment-2219658049 Btw: you should rebase your PR. The build failed with `Execution failed for task ':streams:spotlessJavaCheck'.` due to changes in `trunk` that you should pickup and fix your PR to make it pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
mjsax commented on PR #15740: URL: https://github.com/apache/kafka/pull/15740#issuecomment-2219637356 One more thing: we should also update `docs/streams/upgrade.html` (just listing the KIP in the 3.9 upgrade section -- this section might not yet existing, so just add it for this case) and `docs/streams/devoloper-guide/dsl-api.html` (add the new operator to the table listing all `KStream` operators (cf https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html#transform-a-stream) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
mjsax commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1671666407 ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java: ## @@ -439,6 +439,87 @@ public void shouldPerformSelectKeyWithRepartitionOperation(final String topology assertEquals(1, countOccurrencesInTopology(topology, "Sink: .*-repartition.*")); } +@ParameterizedTest +@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION}) +public void shouldNotRepartitionWithMarkAsPartitionedFollowingSelectKey(final String topologyOptimization) throws Exception { +final long timestamp = System.currentTimeMillis(); + +sendEvents( +timestamp, +Arrays.asList( +new KeyValue<>(1, "10"), +new KeyValue<>(2, "20") +) +); + +final StreamsBuilder builder = new StreamsBuilder(); + +builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())) +.selectKey((key, value) -> Integer.valueOf(value)) +.markAsPartitioned(Named.as("partition-preserved")) +.groupByKey() +.count() +.toStream() +.to(outputTopic); + + +startStreams(builder, createStreamsConfig(topologyOptimization)); + +validateReceivedMessages( +new IntegerDeserializer(), +new LongDeserializer(), +Arrays.asList( +new KeyValue<>(10, 1L), +new KeyValue<>(20, 1L) +) +); + +final String topology = builder.build().describe().toString(); + +assertEquals(0, countOccurrencesInTopology(topology, "Sink: .*-repartition.*")); +} + +@ParameterizedTest +@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION}) +public void shouldNotRepartitionWithMarkAsPartitionedFollowingMap(final String topologyOptimization) throws Exception { +final String topicBMapperName = "topic-b-mapper"; +final long timestamp = System.currentTimeMillis(); + +sendEvents( +timestamp, +Arrays.asList( +new KeyValue<>(1, "10"), +new KeyValue<>(2, "20") +) +); + +final StreamsBuilder builder = new StreamsBuilder(); + +builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())) +.map(KeyValue::new, Named.as(topicBMapperName)) Review Comment: This seems to be pretty much the same as the test above? Do we gain much adding it with regard to "useful" test coverage? It might be more interesting to add a "branching" case, as well as a "downstream" key-changing operation: ``` // branching KStream s = builder.stream(...).map(...) s.markAsPartitioned(Named.as("partition-preserved")) .groupByKey() .count() .toStream() .to(outputTopic); // re-using `s` here, "branches / fans-out" the topology, and because we call `groupByKey()` directly on `s` we should get a repartition topic here (we don't see the `markAsPartitions()` operation from above, and it should not impact `s` itself. s.groupByKey() .count() .toStream() .to(outputTopic2); // downstream map KStream s = builder.stream(...) .map(...) .markAsPartitioned(Named.as("partition-preserved")) .map(...) // insert a second map -- this should make the upstream `markAsPartitioned()` void and we should get a repartition topic again .groupByKey() .count() .toStream() .to(outputTopic); // maybe also: branch after `markAsPartitions` KStream s = builder.stream(...) .map(...) .markAsPartitioned(Named.as("partition-preserved")); // map() after "markAsPartitios in this branch -- should trigger repartitoning s.map(...) .groupByKey() .count() .toStream() .to(outputTopic); // no reparation topic in this branch s.groupByKey() .count() .toStream() .to(outputTopic2); ``` ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -1624,4 +1624,28 @@ public KStream processValues( processNode, builder); } + +@Override +public KStream markAsPartitioned(final Named named) { + +final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PARTITION_PRESERVE_NAME); + +final ProcessorParameters processorParameters = +new ProcessorParameters<>(new PassThrough<>(), PARTITION_PRESERVE_NAME + name); + +final PartitionPreservingNode partitionPreservingNode = new PartitionPreservingNode<>( +
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650219136 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -1616,4 +1623,25 @@ public KStream processValues( processNode, builder); } + +@Override +public KStream markAsPartitioned() { +final ProcessorParameters processorParameters = +new ProcessorParameters<>(new PassThrough<>(), PARTITION_PRESERVE_NAME + name); Review Comment: I will keep this open to remind myself to update the KIP after the review is complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650218909 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. Review Comment: @mjsax Sorry for the confusion, the javadoc here can be better written...Before I do that, the composite key notion came from the original discussion [here](https://lists.apache.org/thread/r7yqsoqsox0z2mzxt33r9r99tnwvb58o) - nonetheless I should remove it. Perhaps a brief description that `interactive query might fail when trying to guess the original key`. As for Joins, I might need your help. Most of my understanding of the problem came from the discussion thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650214064 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final KeyValueMapper selectKeyProcessorNode = internalSelectKey(mapper, new NamedInternal(named)); -selectKeyProcessorNode.keyChangingOperation(true); +selectKeyProcessorNode.keyChangingOperation(repartitionRequired); builder.addGraphNode(graphNode, selectKeyProcessorNode); // key serde cannot be preserved return new KStreamImpl<>( -selectKeyProcessorNode.nodeName(), -null, -valueSerde, -subTopologySourceNodes, -true, -selectKeyProcessorNode, -builder); +selectKeyProcessorNode.nodeName(), Review Comment: Thanks, reverted these indentations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650128875 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final KeyValueMapper
Re: [PR] KIP-759 Mark as Partitioned [kafka]
mjsax commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1585886754 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. Review Comment: ```suggestion * and does not require further repartitioning by downstream key depedent operations. ``` ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final KeyValueMapper selectKeyProcessorNode = internalSelectKey(mapper, new NamedInternal(named)); -selectKeyProcessorNode.keyChangingOperation(true); +selectKeyProcessorNode.keyChangingOperation(repartitionRequired); builder.addGraphNode(graphNode, selectKeyProcessorNode); // key serde cannot be preserved return new KStreamImpl<>( -selectKeyProcessorNode.nodeName(), -null, -valueSerde, -subTopologySourceNodes, -true, -selectKeyProcessorNode, -builder); +selectKeyProcessorNode.nodeName(), Review Comment: nit: avoid unnecessary reformatting (ie, indention in this case) -- I assume you have some "auto format" feature enabled in your IDE. I would recommend to disable it, or adjust the setting to avoid noise like this. ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * + * This method will overwrite a default behavior as described below. + * By default, Kafka Streams always automatically repartition the records to prepare for a stateful operation, + * however, it is not always required when input stream is partitioned as intended. As an example, + * if an input stream is partitioned by a String key1, calling the below function will trigger a repartition: + * + * {@code + * KStream inputStream = builder.stream("topic"); + * stream + * .selectKey( ... => (key1, metric)) + * .groupByKey() + * .aggregate() + * } + * + * You can then overwrite the default behavior by calling this method: + * {@code + * stream + * .selectKey( ... => (key1, metric)) + * .markAsPartitioned() + * .groupByKey() + * .aggregate() + * } + * Review Comment: Do we need this tag? ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. Review Comment: Can you refresh my memory about joins? I cannot remember the details. We should add a section to the `docs/streams/developer-guide/dsl-api.html` and explain the "do" and "donts" of this operation. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final
[PR] KIP-759 Mark as Partitioned [kafka]
LQXshane opened a new pull request, #15740: URL: https://github.com/apache/kafka/pull/15740 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org