junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866131407
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java: ########## @@ -25,12 +23,15 @@ private final Cluster cluster; private final Serializer<K> keySerializer; - private final DefaultPartitioner defaultPartitioner; + @SuppressWarnings("deprecation") + private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner; + + @SuppressWarnings("deprecation") public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) { this.cluster = cluster; this.keySerializer = keySerializer; - this.defaultPartitioner = new DefaultPartitioner(); + this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); Review Comment: Hmm, onNewBatch() in DefaultPartitioner is not called by the application code. It's called inside Sender. Converting DefaultStreamPartitioner to the new built-in partitioner is a bit tricky since kstreams calls the partitioner explicitly and then passes in the partition during send. We need to change kstreams code so that it lets the producer determine the partition. This can be done in a separate PR. cc @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org