zhuzhurk commented on a change in pull request #18673:
URL: https://github.com/apache/flink/pull/18673#discussion_r803389214



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them

Review comment:
       maybe "consecutive the same groupBy(i.e. keyBy)" -> "consecutive and the 
same hash shuffles"

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them
+ * except the first one to use forward partitioner, so that these operators 
can be chained to reduce
+ * unnecessary shuffles.
+ *
+ * <pre>{@code
+ * A --[hash]--> B --[hash]--> C
+ *            |
+ *            V
+ * A --[hash]--> B --[forward]--> C
+ *
+ * }</pre>
+ *
+ * <p>However, sometimes the consecutive hash operators are not chained (e.g. 
multiple inputs), and
+ * this kind of forward partitioners will turn into forward job edges. These 
forward edges still
+ * have the consecutive hash assumption, so that they cannot be changed into 
rescale/rebalance
+ * edges, otherwise it can lead to incorrect results. This prevents the 
adaptive batch scheduler
+ * from determining parallelism for other forward edge downstream job 
vertices(see FLINK-25046).
+ *
+ * <p>To solve it, we introduce the {@link 
ForwardForConsecutiveHashPartitioner}. When SQL planner
+ * optimizes the case of multiple consecutive the same groupBy, it should use 
this partitioner, and
+ * then the runtime framework will change it to forward/hash after the 
operator chain creation.
+ *
+ * <pre>{@code
+ * A --[hash]--> B --[hash]--> C
+ *            |
+ *            V
+ * A --[hash]--> B --[ForwardForConsecutiveHash]--> C
+ *
+ * }</pre>
+ *
+ * <p>his partitioner will be converted to following partitioners after the 
operator chain creation:
+ *
+ * <p>1. Be converted to {@link ForwardPartitioner} if this partitioner is 
intra-chain.
+ *
+ * <p>2. Be converted to {@link 
ForwardForConsecutiveHashPartitioner#hashPartitioner} if this
+ * partitioner is inter-chain.
+ *
+ * <p>This partitioner should only be used for SQL Batch jobs and use Adaptive 
Batch Scheduler.

Review comment:
       use Adaptive Batch Scheduler -> when using AdaptiveBatchScheduler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to