junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866131407


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##########
@@ -25,12 +23,15 @@
 
     private final Cluster cluster;
     private final Serializer<K> keySerializer;
-    private final DefaultPartitioner defaultPartitioner;
 
+    @SuppressWarnings("deprecation")
+    private final 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
defaultPartitioner;
+
+    @SuppressWarnings("deprecation")
     public DefaultStreamPartitioner(final Serializer<K> keySerializer, final 
Cluster cluster) {
         this.cluster = cluster;
         this.keySerializer = keySerializer;
-        this.defaultPartitioner = new DefaultPartitioner();
+        this.defaultPartitioner = new 
org.apache.kafka.clients.producer.internals.DefaultPartitioner();

Review Comment:
   Hmm, onNewBatch() in DefaultPartitioner is not called by the application 
code. It's called inside Sender. 
   
   Converting DefaultStreamPartitioner to the new built-in partitioner is a bit 
tricky since kstreams calls the partitioner explicitly and then passes in the 
partition during send. We need to change kstreams code so that it lets the 
producer determine the partition. This can be done in a separate PR.  cc 
@guozhangwang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to