zhuzhurk commented on a change in pull request #18673: URL: https://github.com/apache/flink/pull/18673#discussion_r803389214
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.partitioner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner will change them Review comment: maybe "consecutive the same groupBy(i.e. keyBy)" -> "consecutive and the same hash shuffles" ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.partitioner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner will change them + * except the first one to use forward partitioner, so that these operators can be chained to reduce + * unnecessary shuffles. + * + * <pre>{@code + * A --[hash]--> B --[hash]--> C + * | + * V + * A --[hash]--> B --[forward]--> C + * + * }</pre> + * + * <p>However, sometimes the consecutive hash operators are not chained (e.g. multiple inputs), and + * this kind of forward partitioners will turn into forward job edges. These forward edges still + * have the consecutive hash assumption, so that they cannot be changed into rescale/rebalance + * edges, otherwise it can lead to incorrect results. This prevents the adaptive batch scheduler + * from determining parallelism for other forward edge downstream job vertices(see FLINK-25046). + * + * <p>To solve it, we introduce the {@link ForwardForConsecutiveHashPartitioner}. When SQL planner + * optimizes the case of multiple consecutive the same groupBy, it should use this partitioner, and + * then the runtime framework will change it to forward/hash after the operator chain creation. + * + * <pre>{@code + * A --[hash]--> B --[hash]--> C + * | + * V + * A --[hash]--> B --[ForwardForConsecutiveHash]--> C + * + * }</pre> + * + * <p>his partitioner will be converted to following partitioners after the operator chain creation: + * + * <p>1. Be converted to {@link ForwardPartitioner} if this partitioner is intra-chain. + * + * <p>2. Be converted to {@link ForwardForConsecutiveHashPartitioner#hashPartitioner} if this + * partitioner is inter-chain. + * + * <p>This partitioner should only be used for SQL Batch jobs and use Adaptive Batch Scheduler. Review comment: use Adaptive Batch Scheduler -> when using AdaptiveBatchScheduler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org