gustavodemorais commented on code in PR #26856:
URL: https://github.com/apache/flink/pull/26856#discussion_r2247249115


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectMultiJoinTransposeRule.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.rules.MultiJoin;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.mapping.Mappings;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Planner rule that pushes a {@link Project} past a {@link MultiJoin} by 
splitting the projection
+ * into a projection on top of each child of the {@link MultiJoin}.
+ *
+ * <p>This rule transforms a pattern like:
+ *
+ * <pre>
+ * Project
+ *   MultiJoin
+ *     Input1
+ *     Input2
+ *     ...
+ * </pre>
+ *
+ * <p>Into:
+ *
+ * <pre>
+ * Project
+ *   MultiJoin
+ *     Project(Input1)
+ *     Project(Input2)
+ *     ...
+ * </pre>
+ *
+ * <p>This transformation allows the optimizer to push projections down to 
individual inputs,
+ * potentially reducing the amount of data processed in the join operation.
+ */
+@Value.Enclosing
+public class ProjectMultiJoinTransposeRule
+        extends 
RelRule<ProjectMultiJoinTransposeRule.ProjectMultiJoinTransposeRuleConfig> {
+
+    public static final ProjectMultiJoinTransposeRule INSTANCE =
+            ProjectMultiJoinTransposeRuleConfig.DEFAULT.toRule();
+
+    public ProjectMultiJoinTransposeRule(ProjectMultiJoinTransposeRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        final Project originalProject = call.rel(0);
+        final MultiJoin multiJoin = call.rel(1);
+
+        // Return if we project all fields of the multi join, as no 
transformation is needed
+        if (RexUtil.isIdentity(originalProject.getProjects(), 
multiJoin.getRowType())) {
+            return false;
+        }
+
+        // Check if projections were already pushed down to inputs
+        for (RelNode input : multiJoin.getInputs()) {
+            if (isProject(input)) {
+                return false;
+            }
+        }
+
+        return super.matches(call);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        final Project originalProject = call.rel(0);
+        final MultiJoin multiJoin = call.rel(1);
+        final RelBuilder relBuilder = call.builder();
+
+        // Gather all referenced fields in projection and join conditions
+        final ImmutableBitSet referencedFields =
+                collectReferencedFields(originalProject, multiJoin);
+
+        // Create new inputs with projections pushed down
+        final TransformedInputs transformedInputs =
+                createTransformedInputs(multiJoin, referencedFields, 
relBuilder);
+
+        // Create field mapping from old to new field positions in the multi 
join
+        final Mappings.TargetMapping fieldMapping =
+                createFieldMapping(multiJoin, transformedInputs);
+
+        final MultiJoin newMultiJoin =
+                createMultiJoinWithAdjustedParams(multiJoin, 
transformedInputs, fieldMapping);
+
+        // Update the projection on top of the multi join with the new field 
mapping
+        final List<RexNode> newProjects =
+                RexUtil.apply(fieldMapping, originalProject.getProjects());
+
+        relBuilder.push(newMultiJoin);
+        relBuilder.project(newProjects, 
originalProject.getRowType().getFieldNames());
+        call.transformTo(relBuilder.build());
+    }
+
+    /** Collects all field references from the projection and join conditions. 
*/
+    private ImmutableBitSet collectReferencedFields(Project project, MultiJoin 
multiJoin) {
+        final ImmutableBitSet.Builder referencedFieldsBuilder = 
ImmutableBitSet.builder();
+        final RexShuttle fieldCollector =
+                new RexShuttle() {
+                    @Override
+                    public RexNode visitInputRef(RexInputRef inputRef) {
+                        referencedFieldsBuilder.set(inputRef.getIndex());
+                        return inputRef;
+                    }
+                };
+
+        // Collect references from projection expressions
+        fieldCollector.apply(project.getProjects());
+
+        // Collect references from join filter
+        fieldCollector.apply(multiJoin.getJoinFilter());
+
+        // Collect references from post-join filter
+        if (multiJoin.getPostJoinFilter() != null) {
+            fieldCollector.apply(multiJoin.getPostJoinFilter());
+        }
+
+        // Collect references from outer join conditions
+        multiJoin.getOuterJoinConditions().forEach(fieldCollector::apply);
+
+        return referencedFieldsBuilder.build();
+    }
+
+    /** Creates transformed inputs with projections pushed down to individual 
inputs. */
+    private TransformedInputs createTransformedInputs(

Review Comment:
   Because we have to consider columns used in projections, filter or join 
conditions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to