zhuzhurk commented on code in PR #25578:
URL: https://github.com/apache/flink/pull/25578#discussion_r1897663360
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -111,6 +111,9 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_ROW_COUNT)
private final long estimatedRightRowCount;
+ @JsonProperty(FIELD_NAME_IS_JOIN_HINT)
+ private final boolean withHint;
Review Comment:
What would happen if an old compiled plan is loaded with Flink 2.0 which has
this new property? Will compatible issue may happen?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -111,6 +111,9 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_ROW_COUNT)
private final long estimatedRightRowCount;
+ @JsonProperty(FIELD_NAME_IS_JOIN_HINT)
+ private final boolean withHint;
Review Comment:
Name it as `withJobStrategyHint` will make it easier to understand for
others.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortMergeJoin.scala:
##########
@@ -175,13 +178,22 @@ class BatchPhysicalSortMergeJoin(
FlinkTypeFactory.toLogicalRowType(left.getRowType),
FlinkTypeFactory.toLogicalRowType(right.getRowType))
+ val mq = getCluster.getMetadataQuery
+ val leftRowSize = Util.first(mq.getAverageRowSize(left), 24).toInt
+ val leftRowCount = Util.first(mq.getRowCount(left), 200000).toLong
+ val rightRowSize = Util.first(mq.getAverageRowSize(right), 24).toInt
+ val rightRowCount = Util.first(mq.getRowCount(right), 200000).toLong
Review Comment:
Maybe extract it into a method `getEstimatedRowStats(BatchPhysicalJoinBase)`
in `JoinUtil` to be reused and avoid inconsistency in the future.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala:
##########
@@ -275,4 +281,25 @@ object JoinUtil {
rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
}
}
+
+ def getLargeManagedMemory(joinType: FlinkJoinType, config: ExecNodeConfig):
Long = {
+ val hashJoinManagedMemory =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes
+ // The memory used by SortMergeJoinIterator that buffer the matched rows,
each side needs
+ // this memory if it is full outer join
+ val externalBufferMemory =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY).getBytes
+ // The memory used by BinaryExternalSorter for sort, the left and right
side both need it
+ val sortMemory =
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes
+ var externalBufferNum = 1
+ if (joinType eq FlinkJoinType.FULL) externalBufferNum = 2
Review Comment:
According to the code-style requirement of Flink, it should be:
```
if (joinType eq FlinkJoinType.FULL) {
externalBufferNum = 2
}
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala:
##########
@@ -275,4 +281,25 @@ object JoinUtil {
rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
}
}
+
+ def getLargeManagedMemory(joinType: FlinkJoinType, config: ExecNodeConfig):
Long = {
Review Comment:
This method is now coped and converted into a public method. It's better to
name it as `getManagedMemory` and add comment to explain that it takes the
larger managed memory to support a HashJoin to be converted into a
SortMergeJoin.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType.KEEP_INPUT_AS_IS;
+
+/**
+ * A {@link ExecNodeGraphProcessor} which replace the qualified join nodes
into adaptive broadcast
+ * join nodes.
+ */
+public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor {
+
+ @Override
+ public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext
context) {
+ if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) {
+ throw new TableException("StreamExecNode is not supported yet");
Review Comment:
maybe 'AdaptiveJoin does not support streaming jobs.'
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.adaptive.AdaptiveJoinOperatorGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/** {@link BatchExecNode} for adaptive join. */
+public class BatchExecAdaptiveJoin extends ExecNodeBase<RowData>
Review Comment:
I think CompiledPlan annotations will be needed, see FLINK-35797.
It can be done in a follow-up task though.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.adaptive.AdaptiveJoinOperatorGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/** {@link BatchExecNode} for adaptive join. */
+public class BatchExecAdaptiveJoin extends ExecNodeBase<RowData>
+ implements BatchExecNode<RowData>,
SingleTransformationTranslator<RowData> {
+
+ private final JoinSpec joinSpec;
+ private final boolean leftIsBuild;
+ private final int estimatedLeftAvgRowSize;
+ private final int estimatedRightAvgRowSize;
+ private final long estimatedLeftRowCount;
+ private final long estimatedRightRowCount;
+ private final boolean tryDistinctBuildRow;
+ private final String description;
+ private final OperatorType originalJoin;
+
+ public BatchExecAdaptiveJoin(
+ ReadableConfig tableConfig,
+ JoinSpec joinSpec,
+ int estimatedLeftAvgRowSize,
+ int estimatedRightAvgRowSize,
+ long estimatedLeftRowCount,
+ long estimatedRightRowCount,
+ boolean leftIsBuild,
+ boolean tryDistinctBuildRow,
+ List<InputProperty> inputProperties,
+ RowType outputType,
+ String description,
+ OperatorType originalJoin) {
+ super(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(BatchExecAdaptiveJoin.class),
+
ExecNodeContext.newPersistedConfig(BatchExecAdaptiveJoin.class, tableConfig),
+ inputProperties,
+ outputType,
+ description);
+ this.joinSpec = joinSpec;
+ this.estimatedLeftAvgRowSize = estimatedLeftAvgRowSize;
+ this.estimatedRightAvgRowSize = estimatedRightAvgRowSize;
+ this.estimatedLeftRowCount = estimatedLeftRowCount;
+ this.estimatedRightRowCount = estimatedRightRowCount;
+ this.leftIsBuild = leftIsBuild;
+ this.tryDistinctBuildRow = tryDistinctBuildRow;
+ this.description = description;
+ this.originalJoin = originalJoin;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Transformation<RowData> translateToPlanInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ ExecEdge leftInputEdge = getInputEdges().get(0);
+ ExecEdge rightInputEdge = getInputEdges().get(1);
+
+ Transformation<RowData> leftInputTransform =
+ (Transformation<RowData>)
leftInputEdge.translateToPlan(planner);
+ Transformation<RowData> rightInputTransform =
+ (Transformation<RowData>)
rightInputEdge.translateToPlan(planner);
+ // get input types
+ RowType leftType = (RowType) leftInputEdge.getOutputType();
+ RowType rightType = (RowType) rightInputEdge.getOutputType();
+ long managedMemory =
JoinUtil.getLargeManagedMemory(joinSpec.getJoinType(), config);
+ GeneratedJoinCondition condFunc =
+ JoinUtil.generateConditionFunction(
+ config,
+ planner.getFlinkContext().getClassLoader(),
+ joinSpec.getNonEquiCondition().orElse(null),
+ leftType,
+ rightType);
+
+ AdaptiveJoinOperatorGenerator adaptiveJoin =
+ new AdaptiveJoinOperatorGenerator(
+ joinSpec.getLeftKeys(),
+ joinSpec.getRightKeys(),
+ joinSpec.getJoinType(),
+ joinSpec.getFilterNulls(),
+ leftType,
+ rightType,
+ condFunc,
+ estimatedLeftAvgRowSize,
+ estimatedRightAvgRowSize,
+ estimatedLeftRowCount,
+ estimatedRightRowCount,
+ tryDistinctBuildRow,
+ managedMemory,
+ leftIsBuild,
+ originalJoin);
+
+ return ExecNodeUtil.createTwoInputTransformation(
+ leftInputTransform,
+ rightInputTransform,
+ createTransformationName(config),
+ createTransformationDescription(config),
+ getAdaptiveJoinOperatorFactory(adaptiveJoin),
+ InternalTypeInfo.of(getOutputType()),
+ rightInputTransform.getParallelism(),
Review Comment:
Generally a join should take the parallelism of the probe side as its
parallelism. Given that the probe side may not have been decided yet, maybe we
can take the larger one.
Note that comments are needed to explain this behavior.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.batch.sql.adaptive
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.{ExecutionConfigOptions,
OptimizerConfigOptions}
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+/** Tests for AdaptiveJoinProcessor. */
+class AdaptiveJoinTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @BeforeEach
+ def before(): Unit = {
+ util.addTableSource[(Long, Long, String, Long)]("T", 'a, 'b, 'c, 'd)
+ util.addTableSource[(Long, Long, String, Long)]("T1", 'a1, 'b1, 'c1, 'd1)
+ util.addTableSource[(Long, Long, String, Long)]("T2", 'a2, 'b2, 'c2, 'd2)
+ util.addTableSource[(Long, Long, String, Long)]("T3", 'a3, 'b3, 'c3, 'd3)
+ util.tableConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY,
+ OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.AUTO)
+ }
+
+ @Test
+ def testWithShuffleHashJoin(): Unit = {
+ util.tableEnv.getConfig
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"NestedLoopJoin,SortMergeJoin")
+ val sql = "SELECT * FROM T1, T2 WHERE a1 = a2"
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testWithShuffleMergeJoin(): Unit = {
+ util.tableEnv.getConfig
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"NestedLoopJoin,ShuffleHashJoin")
+ val sql = "SELECT * FROM T1, T2 WHERE a1 = a2"
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testWithStaticBroadcastJoin(): Unit = {
+ util.tableEnv.getConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
+ "SortMergeJoin,ShuffleHashJoin,NestedLoopJoin")
+ util.tableEnv.getConfig
+ .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
Long.box(Long.MaxValue))
+ val sql = "SELECT * FROM T1, T2 WHERE a1 = a2"
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testWithBroadcastJoinRuntimeOnly(): Unit = {
+ util.tableEnv.getConfig
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"NestedLoopJoin")
+ util.tableEnv.getConfig
+ .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
Long.box(Long.MaxValue))
+ util.tableEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY,
+ OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.RUNTIME_ONLY)
+ val sql = "SELECT * FROM T1, T2 WHERE a1 = a2"
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testShuffleJoinWithForwardForConsecutiveHash(): Unit = {
+ util.tableEnv.getConfig
+ .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED,
Boolean.box(false))
+ val sql =
+ """
+ |WITH
+ | r AS (SELECT * FROM T1, T2, T3 WHERE a1 = a2 and a1 = a3)
+ |SELECT sum(b1) FROM r group by a1
+ |""".stripMargin
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testJoinWithUnionInput(): Unit = {
+ val sql =
+ """
+ |SELECT * FROM
+ | (SELECT a FROM (SELECT a1 as a FROM T1) UNION ALL (SELECT a2 as a
FROM T2)) Y
+ | LEFT JOIN T ON T.a = Y.a
+ |""".stripMargin
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testJoinWithMultipleInput(): Unit = {
Review Comment:
For cases which do not support adaptive join, i.e.
`testJoinWithMultipleInput` and `testShuffleJoinWithForwardForConsecutiveHash`,
it's better to add comment for it and explain why.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType.KEEP_INPUT_AS_IS;
+
+/**
+ * A {@link ExecNodeGraphProcessor} which replace the qualified join nodes
into adaptive broadcast
+ * join nodes.
+ */
+public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor {
+
+ @Override
+ public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext
context) {
+ if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) {
+ throw new TableException("StreamExecNode is not supported yet");
+ }
+ if (!isAdaptiveJoinEnabled(context)) {
+ return execGraph;
+ }
+
+ AbstractExecNodeExactlyOnceVisitor visitor =
+ new AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?> node) {
+ visitInputs(node);
+ if
(checkKeepInputAsIsExisted(node.getInputProperties())) {
+ return;
+ }
+ for (int i = 0; i < node.getInputEdges().size(); ++i) {
+ ExecEdge edge = node.getInputEdges().get(i);
+ ExecNode<?> newNode =
replaceAdaptiveJoinNode(edge.getSource());
+ node.replaceInputEdge(
+ i,
+ ExecEdge.builder()
+ .source(newNode)
+ .target(node)
+ .shuffle(edge.getShuffle())
+
.exchangeMode(edge.getExchangeMode())
+ .build());
+ }
+ }
+ };
+
+ List<ExecNode<?>> newRootNodes =
+ execGraph.getRootNodes().stream()
+ .map(
+ node -> {
+ node = replaceAdaptiveJoinNode(node);
+ node.accept(visitor);
+ return node;
+ })
+ .collect(Collectors.toList());
+
+ return new ExecNodeGraph(execGraph.getFlinkVersion(), newRootNodes);
+ }
+
+ private ExecNode<?> replaceAdaptiveJoinNode(ExecNode<?> node) {
Review Comment:
I prefer to adjust the method names a bit:
replaceAdaptiveJoinNode -> tryReplaceWithAdaptiveJoinNode
checkKeepInputAsIsExisted -> shouldKeepInputAsIs
isUpstreamNodeKeepInputAsIs -> shouldKeepUpstreamNodeInputAsIs
checkAllInputShuffleIsHash -> areAllInputsHashShuffle
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType.KEEP_INPUT_AS_IS;
+
+/**
+ * A {@link ExecNodeGraphProcessor} which replace the qualified join nodes
into adaptive broadcast
+ * join nodes.
+ */
+public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor {
+
+ @Override
+ public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext
context) {
+ if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) {
+ throw new TableException("StreamExecNode is not supported yet");
+ }
+ if (!isAdaptiveJoinEnabled(context)) {
+ return execGraph;
+ }
+
+ AbstractExecNodeExactlyOnceVisitor visitor =
+ new AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?> node) {
+ visitInputs(node);
+ if
(checkKeepInputAsIsExisted(node.getInputProperties())) {
+ return;
+ }
+ for (int i = 0; i < node.getInputEdges().size(); ++i) {
+ ExecEdge edge = node.getInputEdges().get(i);
+ ExecNode<?> newNode =
replaceAdaptiveJoinNode(edge.getSource());
+ node.replaceInputEdge(
+ i,
+ ExecEdge.builder()
+ .source(newNode)
+ .target(node)
+ .shuffle(edge.getShuffle())
+
.exchangeMode(edge.getExchangeMode())
+ .build());
+ }
+ }
+ };
+
+ List<ExecNode<?>> newRootNodes =
+ execGraph.getRootNodes().stream()
+ .map(
+ node -> {
+ node = replaceAdaptiveJoinNode(node);
+ node.accept(visitor);
+ return node;
+ })
+ .collect(Collectors.toList());
+
+ return new ExecNodeGraph(execGraph.getFlinkVersion(), newRootNodes);
+ }
+
+ private ExecNode<?> replaceAdaptiveJoinNode(ExecNode<?> node) {
+ if (!(checkAllInputShuffleIsHash(node))
+ || isUpstreamNodeKeepInputAsIs(node.getInputEdges())) {
+ return node;
+ }
+ ExecNode<?> newNode = node;
+ if (node instanceof AdaptiveJoinExecNode
+ && ((AdaptiveJoinExecNode)
node).canBeTransformedToAdaptiveJoin()) {
+ BatchExecAdaptiveJoin adaptiveJoin = ((AdaptiveJoinExecNode)
node).toAdaptiveJoinNode();
+ replaceInputEdge(adaptiveJoin, node);
+ newNode = adaptiveJoin;
+ }
+
+ return newNode;
+ }
+
+ private boolean checkKeepInputAsIsExisted(List<InputProperty>
inputProperties) {
+ return inputProperties.stream()
+ .anyMatch(
+ inputProperty ->
+
inputProperty.getRequiredDistribution().getType()
+ == KEEP_INPUT_AS_IS);
+ }
+
+ private boolean isUpstreamNodeKeepInputAsIs(List<ExecEdge> inputEdges) {
+ return inputEdges.stream()
+ .filter(execEdge -> execEdge.getSource() instanceof
BatchExecExchange)
+ .map(execEdge -> (BatchExecExchange) execEdge.getSource())
+ .anyMatch(exchange ->
checkKeepInputAsIsExisted(exchange.getInputProperties()));
+ }
+
+ private boolean isAdaptiveJoinEnabled(ProcessorContext context) {
+ TableConfig tableConfig = context.getPlanner().getTableConfig();
+ boolean isAdaptiveJoinEnabled =
+ tableConfig.get(
+ OptimizerConfigOptions
+
.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY)
+ !=
OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.NONE
+ && !TableConfigUtils.isOperatorDisabled(
Review Comment:
This should not affect SkewedJoin optimization.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType.KEEP_INPUT_AS_IS;
+
+/**
+ * A {@link ExecNodeGraphProcessor} which replace the qualified join nodes
into adaptive broadcast
+ * join nodes.
+ */
+public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor {
+
+ @Override
+ public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext
context) {
+ if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) {
+ throw new TableException("StreamExecNode is not supported yet");
+ }
+ if (!isAdaptiveJoinEnabled(context)) {
+ return execGraph;
+ }
+
+ AbstractExecNodeExactlyOnceVisitor visitor =
+ new AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?> node) {
+ visitInputs(node);
+ if
(checkKeepInputAsIsExisted(node.getInputProperties())) {
Review Comment:
Comments are needed to explain adaptive join does not support this case.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.utils.OperatorType;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType.KEEP_INPUT_AS_IS;
+
+/**
+ * A {@link ExecNodeGraphProcessor} which replace the qualified join nodes
into adaptive broadcast
+ * join nodes.
+ */
+public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor {
+
+ @Override
+ public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext
context) {
+ if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) {
+ throw new TableException("StreamExecNode is not supported yet");
+ }
+ if (!isAdaptiveJoinEnabled(context)) {
+ return execGraph;
+ }
+
+ AbstractExecNodeExactlyOnceVisitor visitor =
+ new AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?> node) {
+ visitInputs(node);
+ if
(checkKeepInputAsIsExisted(node.getInputProperties())) {
+ return;
+ }
+ for (int i = 0; i < node.getInputEdges().size(); ++i) {
+ ExecEdge edge = node.getInputEdges().get(i);
+ ExecNode<?> newNode =
replaceAdaptiveJoinNode(edge.getSource());
+ node.replaceInputEdge(
+ i,
+ ExecEdge.builder()
+ .source(newNode)
+ .target(node)
+ .shuffle(edge.getShuffle())
+
.exchangeMode(edge.getExchangeMode())
+ .build());
+ }
+ }
+ };
+
+ List<ExecNode<?>> newRootNodes =
+ execGraph.getRootNodes().stream()
+ .map(
+ node -> {
+ node = replaceAdaptiveJoinNode(node);
+ node.accept(visitor);
+ return node;
+ })
+ .collect(Collectors.toList());
+
+ return new ExecNodeGraph(execGraph.getFlinkVersion(), newRootNodes);
+ }
+
+ private ExecNode<?> replaceAdaptiveJoinNode(ExecNode<?> node) {
+ if (!(checkAllInputShuffleIsHash(node))
+ || isUpstreamNodeKeepInputAsIs(node.getInputEdges())) {
+ return node;
+ }
+ ExecNode<?> newNode = node;
+ if (node instanceof AdaptiveJoinExecNode
+ && ((AdaptiveJoinExecNode)
node).canBeTransformedToAdaptiveJoin()) {
+ BatchExecAdaptiveJoin adaptiveJoin = ((AdaptiveJoinExecNode)
node).toAdaptiveJoinNode();
+ replaceInputEdge(adaptiveJoin, node);
+ newNode = adaptiveJoin;
+ }
+
+ return newNode;
+ }
+
+ private boolean checkKeepInputAsIsExisted(List<InputProperty>
inputProperties) {
+ return inputProperties.stream()
+ .anyMatch(
+ inputProperty ->
+
inputProperty.getRequiredDistribution().getType()
+ == KEEP_INPUT_AS_IS);
+ }
+
+ private boolean isUpstreamNodeKeepInputAsIs(List<ExecEdge> inputEdges) {
+ return inputEdges.stream()
+ .filter(execEdge -> execEdge.getSource() instanceof
BatchExecExchange)
Review Comment:
Why it only checks `BatchExecExchange` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]