junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866131407
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##########
@@ -25,12 +23,15 @@
private final Cluster cluster;
private final Serializer<K> keySerializer;
- private final DefaultPartitioner defaultPartitioner;
+ @SuppressWarnings("deprecation")
+ private final
org.apache.kafka.clients.producer.internals.DefaultPartitioner
defaultPartitioner;
+
+ @SuppressWarnings("deprecation")
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final
Cluster cluster) {
this.cluster = cluster;
this.keySerializer = keySerializer;
- this.defaultPartitioner = new DefaultPartitioner();
+ this.defaultPartitioner = new
org.apache.kafka.clients.producer.internals.DefaultPartitioner();
Review Comment:
Hmm, onNewBatch() in DefaultPartitioner is not called by the application
code. It's called inside Sender.
Converting DefaultStreamPartitioner to the new built-in partitioner is a bit
tricky since kstreams calls the partitioner explicitly and then passes in the
partition during send. We need to change kstreams code so that it lets the
producer determine the partition. This can be done in a separate PR. cc
@guozhangwang
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]