This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d20533e58770c276e7fc2ac0029dbf61ecf0b19e
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Thu Dec 15 15:46:15 2022 +0100

    Use RebalancePartitioner
---
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java     | 4 ++--
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 3 ++-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 16666b71804..5130861b844 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -92,7 +92,7 @@ import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
 import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
 import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
@@ -1622,7 +1622,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         if (streamOutput.getPartitioner() instanceof ForwardPartitioner
                 && streamOutput.getConsumerParallelism()
                         != 
environment.getTaskInfo().getNumberOfParallelSubtasks()) {
-            streamOutput.setPartitioner(new RescalePartitioner<>());
+            streamOutput.setPartitioner(new RebalancePartitioner<>());
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 5d5bba63af6..ff398d37d1c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -131,6 +131,7 @@ import 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
@@ -1864,7 +1865,7 @@ public class StreamTaskTest extends TestLogger {
                                             ((SingleRecordWriter) 
recordWriterDelegate)
                                                     .getRecordWriter(0))
                                     .getChannelSelector()
-                            instanceof RescalePartitioner);
+                            instanceof RebalancePartitioner);
         }
     }
 

Reply via email to