This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d20533e58770c276e7fc2ac0029dbf61ecf0b19e Author: Maximilian Michels <m...@apache.org> AuthorDate: Thu Dec 15 15:46:15 2022 +0100 Use RebalancePartitioner --- .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java | 4 ++-- .../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 16666b71804..5130861b844 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -92,7 +92,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil; import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; -import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer; @@ -1622,7 +1622,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> if (streamOutput.getPartitioner() instanceof ForwardPartitioner && streamOutput.getConsumerParallelism() != environment.getTaskInfo().getNumberOfParallelSubtasks()) { - streamOutput.setPartitioner(new RescalePartitioner<>()); + streamOutput.setPartitioner(new RebalancePartitioner<>()); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 5d5bba63af6..ff398d37d1c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -131,6 +131,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.io.DataInputStatus; import org.apache.flink.streaming.runtime.io.StreamInputProcessor; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; @@ -1864,7 +1865,7 @@ public class StreamTaskTest extends TestLogger { ((SingleRecordWriter) recordWriterDelegate) .getRecordWriter(0)) .getChannelSelector() - instanceof RescalePartitioner); + instanceof RebalancePartitioner); } }