1996fanrui commented on code in PR #21443: URL: https://github.com/apache/flink/pull/21443#discussion_r1042919428
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ########## @@ -1614,6 +1617,15 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters return recordWriters; } + private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch( + Environment environment, NonChainedOutput streamOutput) { + if (streamOutput.getPartitioner() instanceof ForwardPartitioner + && streamOutput.getConsumerParallelism() + != environment.getTaskInfo().getNumberOfParallelSubtasks()) { + streamOutput.setPartitioner(new RescalePartitioner<>()); Review Comment: Thanks for your feedback. In most scenarios, `RescalePartitioner` works well. However, I'm not sure if it's generic. For example, a job has 2 vertexes, the parallelism of them are 200, the partitioner is `ForwardPartitioner`. Autoscaling found that the performance of vertex2 is poor, so increase the parallelism of vertex2 from 200 to 300, and the parallelism of vertex1 doesn't be changed. If using the `RescalePartitioner`, there are 100 subtasks of vertex1 will send data to one downstream subtask, and the rest 100 subtasks of vertex1 will send data to 2 downstream subtasks. The first half of the source may still have lag. Not sure if it will happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org