lsyldliu commented on code in PR #23282: URL: https://github.com/apache/flink/pull/23282#discussion_r1308172368
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java: ########## @@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements ExecNodeGraphProcess @Override public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) { - ExecNodeGraph factSideProcessedGraph = checkIfFactSourceNeedEnforceDependency(execGraph); + ExecNodeGraph factSideProcessedGraph = + checkIfFactSourceNeedEnforceDependency(execGraph, context); return enforceDimSideBlockingExchange(factSideProcessedGraph, context); } - private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph execGraph) { - Map<BatchExecTableSourceScan, List<ExecNode<?>>> dynamicFilteringScanDescendants = + private ExecNodeGraph checkIfFactSourceNeedEnforceDependency( + ExecNodeGraph execGraph, ProcessorContext context) { + Map<BatchExecTableSourceScan, List<DescendantInfo>> dynamicFilteringScanDescendants = new HashMap<>(); AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector = new AbstractExecNodeExactlyOnceVisitor() { @Override protected void visitNode(ExecNode<?> node) { - node.getInputEdges().stream() - .map(ExecEdge::getSource) - .forEach( - input -> { - // The character of the dynamic filter scan is that it - // has an input. - if (input instanceof BatchExecTableSourceScan - && input.getInputEdges().size() > 0) { - dynamicFilteringScanDescendants - .computeIfAbsent( - (BatchExecTableSourceScan) input, - ignored -> new ArrayList<>()) - .add(node); - } - }); + for (int i = 0; i < node.getInputEdges().size(); ++i) { + ExecEdge edge = node.getInputEdges().get(i); + ExecNode<?> input = edge.getSource(); + + // The character of the dynamic filter scan is that it + // has an input. + if (input instanceof BatchExecTableSourceScan + && input.getInputEdges().size() > 0) { + dynamicFilteringScanDescendants + .computeIfAbsent( + (BatchExecTableSourceScan) input, + ignored -> new ArrayList<>()) + .add(new DescendantInfo(node, i)); + } + } visitInputs(node); } }; execGraph.getRootNodes().forEach(node -> node.accept(dynamicFilteringScanCollector)); - for (Map.Entry<BatchExecTableSourceScan, List<ExecNode<?>>> entry : + for (Map.Entry<BatchExecTableSourceScan, List<DescendantInfo>> entry : dynamicFilteringScanDescendants.entrySet()) { - if (entry.getValue().size() == 1) { - ExecNode<?> next = entry.getValue().get(0); - if (next instanceof BatchExecMultipleInput) { - // the source can be chained with BatchExecMultipleInput - continue; - } - } - // otherwise we need dependencies - entry.getKey().setNeedDynamicFilteringDependency(true); + BatchExecTableSourceScan tableSourceScan = entry.getKey(); + BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector = + getDynamicFilteringDataCollector(tableSourceScan); + + // Add exchange between collector and enforcer + BatchExecExchange exchange = + new BatchExecExchange( + context.getPlanner().getTableConfig(), + InputProperty.builder() + .requiredDistribution(InputProperty.ANY_DISTRIBUTION) + .damBehavior(InputProperty.DamBehavior.BLOCKING) + .build(), + (RowType) dynamicFilteringDataCollector.getOutputType(), + "Exchange"); + exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH); + exchange.setInputEdges( + Collections.singletonList( + ExecEdge.builder() + .source(dynamicFilteringDataCollector) + .target(exchange) + .build())); + + // set enforcer inputs + BatchExecExecutionOrderEnforcer enforcer = + new BatchExecExecutionOrderEnforcer( + context.getPlanner().getTableConfig(), + Arrays.asList( + exchange.getInputProperties().get(0), InputProperty.DEFAULT), + tableSourceScan.getOutputType(), + "OrderEnforcer"); + ExecEdge edge1 = ExecEdge.builder().source(exchange).target(enforcer).build(); + ExecEdge edge2 = ExecEdge.builder().source(tableSourceScan).target(enforcer).build(); + enforcer.setInputEdges(Arrays.asList(edge1, edge2)); + + // we clear the input of tableSourceScan to avoid cycle in exec plan + tableSourceScan.setInputEdges(Collections.emptyList()); + tableSourceScan.setInputProperties(Collections.emptyList()); Review Comment: Can you explain why we need to reset the `inputProperties`? I'm concerned that the `inputProperties` is final from the initial design, we shouldn't change it as soon as possible. I think we don't need to reset it without having side effects, there are many places that generate the new Exchange without resetting it. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala: ########## @@ -73,15 +73,14 @@ class BatchPlanner( val processors = new util.ArrayList[ExecNodeGraphProcessor]() // deadlock breakup processors.add(new DeadlockBreakupProcessor()) + if (getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { + processors.add(new DynamicFilteringDependencyProcessor) + } // multiple input creation if (getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED)) { processors.add(new MultipleInputNodeCreationProcessor(false)) } processors.add(new ForwardHashExchangeProcessor) - if (getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { - processors.add(new DynamicFilteringDependencyProcessor) - processors.add(new ResetTransformationProcessor) Review Comment: The `ResetTransformationProcessor` class can be deleted now. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java: ########## @@ -204,4 +248,16 @@ private BatchExecExchange createExchange( return exchange; } + + private static class DescendantInfo { + /** The DynamicFilteringScan is the {@link inputId}th input of current descendant . */ Review Comment: @code inputId ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java: ########## @@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements ExecNodeGraphProcess @Override public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) { - ExecNodeGraph factSideProcessedGraph = checkIfFactSourceNeedEnforceDependency(execGraph); + ExecNodeGraph factSideProcessedGraph = + checkIfFactSourceNeedEnforceDependency(execGraph, context); return enforceDimSideBlockingExchange(factSideProcessedGraph, context); } - private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph execGraph) { - Map<BatchExecTableSourceScan, List<ExecNode<?>>> dynamicFilteringScanDescendants = + private ExecNodeGraph checkIfFactSourceNeedEnforceDependency( Review Comment: This method's name should be changed, it always inserts enforce dependency. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/dynamicfiltering/ExecutionOrderEnforcerCodeGenerator.scala: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.codegen.dynamicfiltering + +import org.apache.flink.table.data.RowData +import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, OperatorCodeGenerator} +import org.apache.flink.table.planner.codegen.CodeGenUtils.{DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM} +import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExecutionOrderEnforcer +import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory +import org.apache.flink.table.types.logical.RowType + +/** + * Operator code generator for ExecutionOrderEnforcer operator. Input1 is the dependent upstream, + * input2 is the source. see [[BatchExecExecutionOrderEnforcer]] for details. + */ +object ExecutionOrderEnforcerCodeGenerator { + def gen( + ctx: CodeGeneratorContext, + input1Type: RowType, + input2Type: RowType): CodeGenOperatorFactory[RowData] = { + + val processElement2Code = + s""" + |${generateCollect(s"$DEFAULT_INPUT2_TERM")} + |""".stripMargin + + new CodeGenOperatorFactory[RowData]( + OperatorCodeGenerator.generateTwoInputStreamOperator( + ctx, + "ExecutionOrderEnforcerOperator", + "", + processElement2Code, Review Comment: ```suggestion s""" |${generateCollect(s"$DEFAULT_INPUT2_TERM")} |""".stripMargin, ``` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/dynamicfiltering/ExecutionOrderEnforcerCodeGenerator.scala: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.codegen.dynamicfiltering + +import org.apache.flink.table.data.RowData +import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, OperatorCodeGenerator} +import org.apache.flink.table.planner.codegen.CodeGenUtils.{DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM} +import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExecutionOrderEnforcer +import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory +import org.apache.flink.table.types.logical.RowType + +/** + * Operator code generator for ExecutionOrderEnforcer operator. Input1 is the dependent upstream, + * input2 is the source. see [[BatchExecExecutionOrderEnforcer]] for details. + */ +object ExecutionOrderEnforcerCodeGenerator { + def gen( + ctx: CodeGeneratorContext, + input1Type: RowType, + input2Type: RowType): CodeGenOperatorFactory[RowData] = { + + val processElement2Code = + s""" + |${generateCollect(s"$DEFAULT_INPUT2_TERM")} + |""".stripMargin + + new CodeGenOperatorFactory[RowData]( + OperatorCodeGenerator.generateTwoInputStreamOperator( + ctx, + "ExecutionOrderEnforcerOperator", + "", + processElement2Code, + input1Type, + input2Type, + DEFAULT_INPUT1_TERM, + DEFAULT_INPUT2_TERM, + None, + // we cannot pass None or use default here, because this operator must implement BoundedMultiInput Review Comment: If we don't implement the `BoundedMultiInput`, what is the side effect? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java: ########## @@ -108,44 +102,10 @@ protected Transformation<RowData> translateToPlanInternal( return transformation; Review Comment: This if logic can be removed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org