gortiz commented on code in PR #16123:
URL: https://github.com/apache/pinot/pull/16123#discussion_r2256717303


##########
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/logical/PinotLogicalEnrichedJoin.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+public class PinotLogicalEnrichedJoin extends Join {
+
+  private final RelDataType _joinRowType;
+  private final RelDataType _outputRowType;
+  private final List<FilterProjectRexNode> _filterProjectRexNodes;
+  /// currently variableSet of Project Rel is ignored since
+  /// We don't support nested expressions in execution
+  private final Set<CorrelationId> _projectVariableSet;
+  @Nullable
+  private final List<RexNode> _squashedProjects;
+  @Nullable
+  private final RexNode _fetch;
+  @Nullable
+  private final RexNode _offset;
+
+  public PinotLogicalEnrichedJoin(RelOptCluster cluster, RelTraitSet traitSet,
+      List<RelHint> hints, RelNode left, RelNode right, RexNode joinCondition,
+      Set<CorrelationId> variablesSet, JoinRelType joinType,
+      List<FilterProjectRexNode> filterProjectRexNodes,
+      @Nullable RelDataType outputRowType, @Nullable Set<CorrelationId> 
projectVariableSet,
+      @Nullable RexNode fetch, @Nullable RexNode offset) {
+    super(cluster, traitSet, hints, left, right, joinCondition, variablesSet, 
joinType);
+    _filterProjectRexNodes = filterProjectRexNodes;
+    _squashedProjects = squashProjects();
+    _joinRowType = getJoinRowType();
+    // TODO: make sure this aligns with filterProjectRexNodes
+    // if there's projection, getRowType() should return the final projected 
row type as output row type
+    //   otherwise it's the same as _joinRowType
+    _outputRowType = outputRowType == null ? _joinRowType : outputRowType;
+    _projectVariableSet = projectVariableSet;
+    _offset = offset;
+    _fetch = fetch;
+  }
+
+  @Override
+  public PinotLogicalEnrichedJoin copy(RelTraitSet traitSet, RexNode 
conditionExpr,
+      RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) 
{
+    return new PinotLogicalEnrichedJoin(getCluster(), traitSet, getHints(), 
left, right,
+        conditionExpr, getVariablesSet(), getJoinType(),
+        _filterProjectRexNodes, _outputRowType, _projectVariableSet,
+        _fetch, _offset);
+  }
+
+  public PinotLogicalEnrichedJoin withNewProject(FilterProjectRexNode project, 
RelDataType outputRowType,
+      Set<CorrelationId> projectVariableSet) {
+    List<FilterProjectRexNode> filterProjectRexNodes = new 
ArrayList<>(_filterProjectRexNodes);
+    filterProjectRexNodes.add(project);

Review Comment:
   nit: this is not very efficient given we first create an array list with 
_filterProjectRexNodes.size() elements and then we add a new one, which means 
we need to grow the list.
   
   It is uglier to write, but something like:
   ```suggestion
       List<FilterProjectRexNode> filterProjectRexNodes = new 
ArrayList<>(_filterProjectRexNodes.size() + 1);
       filterProjectRexNodes.addAll(_filterProjectRexNodes);
       filterProjectRexNodes.add(project);
   ```
   
   Is cheaper to evaluate.



##########
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/logical/PinotLogicalEnrichedJoin.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+public class PinotLogicalEnrichedJoin extends Join {
+
+  private final RelDataType _joinRowType;
+  private final RelDataType _outputRowType;
+  private final List<FilterProjectRexNode> _filterProjectRexNodes;
+  /// currently variableSet of Project Rel is ignored since
+  /// We don't support nested expressions in execution
+  private final Set<CorrelationId> _projectVariableSet;
+  @Nullable
+  private final List<RexNode> _squashedProjects;
+  @Nullable
+  private final RexNode _fetch;
+  @Nullable
+  private final RexNode _offset;
+
+  public PinotLogicalEnrichedJoin(RelOptCluster cluster, RelTraitSet traitSet,
+      List<RelHint> hints, RelNode left, RelNode right, RexNode joinCondition,
+      Set<CorrelationId> variablesSet, JoinRelType joinType,
+      List<FilterProjectRexNode> filterProjectRexNodes,
+      @Nullable RelDataType outputRowType, @Nullable Set<CorrelationId> 
projectVariableSet,
+      @Nullable RexNode fetch, @Nullable RexNode offset) {
+    super(cluster, traitSet, hints, left, right, joinCondition, variablesSet, 
joinType);
+    _filterProjectRexNodes = filterProjectRexNodes;
+    _squashedProjects = squashProjects();
+    _joinRowType = getJoinRowType();
+    // TODO: make sure this aligns with filterProjectRexNodes
+    // if there's projection, getRowType() should return the final projected 
row type as output row type
+    //   otherwise it's the same as _joinRowType
+    _outputRowType = outputRowType == null ? _joinRowType : outputRowType;
+    _projectVariableSet = projectVariableSet;
+    _offset = offset;
+    _fetch = fetch;
+  }
+
+  @Override
+  public PinotLogicalEnrichedJoin copy(RelTraitSet traitSet, RexNode 
conditionExpr,
+      RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) 
{
+    return new PinotLogicalEnrichedJoin(getCluster(), traitSet, getHints(), 
left, right,
+        conditionExpr, getVariablesSet(), getJoinType(),
+        _filterProjectRexNodes, _outputRowType, _projectVariableSet,
+        _fetch, _offset);
+  }
+
+  public PinotLogicalEnrichedJoin withNewProject(FilterProjectRexNode project, 
RelDataType outputRowType,
+      Set<CorrelationId> projectVariableSet) {
+    List<FilterProjectRexNode> filterProjectRexNodes = new 
ArrayList<>(_filterProjectRexNodes);
+    filterProjectRexNodes.add(project);
+    return new PinotLogicalEnrichedJoin(getCluster(), getTraitSet(), 
getHints(), left, right,
+        getCondition(), getVariablesSet(), getJoinType(),
+        filterProjectRexNodes, outputRowType, projectVariableSet,
+        _fetch, _offset);
+  }
+
+  public PinotLogicalEnrichedJoin withNewFilter(FilterProjectRexNode filter) {
+    List<FilterProjectRexNode> filterProjectRexNodes = new 
ArrayList<>(_filterProjectRexNodes);
+    filterProjectRexNodes.add(filter);

Review Comment:
   nit: same perf trick here



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java:
##########
@@ -181,6 +182,74 @@ public Void visitJoin(JoinNode node, 
ServerPlanRequestContext context) {
     return null;
   }
 
+  @Override
+  public Void visitEnrichedJoin(EnrichedJoinNode node, 
ServerPlanRequestContext context) {
+    // We can reach here for dynamic broadcast SEMI join and lookup join.
+    List<PlanNode> inputs = node.getInputs();
+    PlanNode left = inputs.get(0);
+    PlanNode right = inputs.get(1);
+
+    if (right instanceof MailboxReceiveNode
+        && ((MailboxReceiveNode) right).getExchangeType() == 
PinotRelExchangeType.PIPELINE_BREAKER) {
+      // For dynamic broadcast SEMI join, right child should be a 
PIPELINE_BREAKER exchange. Visit the left child and
+      // attach the dynamic filter to the query.
+      if (visit(left, context)) {
+        // semi join to dynamic filter logic
+        PipelineBreakerResult pipelineBreakerResult = 
context.getPipelineBreakerResult();
+        int resultMapId = pipelineBreakerResult.getNodeIdMap().get(right);
+        List<MseBlock> blocks = 
pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, 
Collections.emptyList());
+        List<Object[]> resultDataContainer = new ArrayList<>();
+        DataSchema dataSchema = right.getDataSchema();
+        for (MseBlock block : blocks) {
+          if (block.isData()) {
+            resultDataContainer.addAll(((MseBlock.Data) 
block).asRowHeap().getRows());
+          }
+        }
+        // TODO: we should keep query stats here as well
+        ServerPlanRequestUtils.attachDynamicFilter(context.getPinotQuery(), 
node.getLeftKeys(), node.getRightKeys(),
+            resultDataContainer, dataSchema);
+
+        // TODO: check whether this, when multiple filter and projects are 
present, is correct

Review Comment:
   Do we still need to verify this TODO?



##########
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/logical/PinotLogicalEnrichedJoin.java:
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+public class PinotLogicalEnrichedJoin extends Join {
+
+  private final RelDataType _joinRowType;
+  private final RelDataType _outputRowType;
+  private final List<FilterProjectRexNode> _filterProjectRexNodes;
+  /// currently variableSet of Project Rel is ignored since
+  /// We don't support nested expressions in execution
+  private final Set<CorrelationId> _projectVariableSet;
+  @Nullable
+  private final List<RexNode> _squashedProjects;
+  @Nullable
+  private final RexNode _fetch;
+  @Nullable
+  private final RexNode _offset;
+
+  public PinotLogicalEnrichedJoin(RelOptCluster cluster, RelTraitSet traitSet,
+      List<RelHint> hints, RelNode left, RelNode right, RexNode joinCondition,
+      Set<CorrelationId> variablesSet, JoinRelType joinType,
+      List<FilterProjectRexNode> filterProjectRexNodes,
+      @Nullable RelDataType outputRowType, @Nullable Set<CorrelationId> 
projectVariableSet,
+      @Nullable RexNode fetch, @Nullable RexNode offset) {
+    super(cluster, traitSet, hints, left, right, joinCondition, variablesSet, 
joinType);
+    _filterProjectRexNodes = filterProjectRexNodes;
+    _squashedProjects = squashProjects();
+    _joinRowType = getJoinRowType();
+    // TODO: make sure this aligns with filterProjectRexNodes
+    // if there's projection, getRowType() should return the final projected 
row type as output row type
+    //   otherwise it's the same as _joinRowType
+    _outputRowType = outputRowType == null ? _joinRowType : outputRowType;
+    _projectVariableSet = projectVariableSet;
+    _offset = offset;
+    _fetch = fetch;
+  }
+
+  @Override
+  public PinotLogicalEnrichedJoin copy(RelTraitSet traitSet, RexNode 
conditionExpr,
+      RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) 
{
+    return new PinotLogicalEnrichedJoin(getCluster(), traitSet, getHints(), 
left, right,
+        conditionExpr, getVariablesSet(), getJoinType(),
+        _filterProjectRexNodes, _outputRowType, _projectVariableSet,
+        _fetch, _offset);
+  }
+
+  public PinotLogicalEnrichedJoin withNewProject(FilterProjectRexNode project, 
RelDataType outputRowType,
+      Set<CorrelationId> projectVariableSet) {
+    List<FilterProjectRexNode> filterProjectRexNodes = new 
ArrayList<>(_filterProjectRexNodes);
+    filterProjectRexNodes.add(project);
+    return new PinotLogicalEnrichedJoin(getCluster(), getTraitSet(), 
getHints(), left, right,
+        getCondition(), getVariablesSet(), getJoinType(),
+        filterProjectRexNodes, outputRowType, projectVariableSet,
+        _fetch, _offset);
+  }
+
+  public PinotLogicalEnrichedJoin withNewFilter(FilterProjectRexNode filter) {
+    List<FilterProjectRexNode> filterProjectRexNodes = new 
ArrayList<>(_filterProjectRexNodes);
+    filterProjectRexNodes.add(filter);
+    return new PinotLogicalEnrichedJoin(getCluster(), getTraitSet(), 
getHints(), left, right,
+        getCondition(), getVariablesSet(), getJoinType(),
+        filterProjectRexNodes, _outputRowType, _projectVariableSet,
+        _fetch, _offset);
+  }
+
+  public PinotLogicalEnrichedJoin withNewFetchOffset(@Nullable RexNode fetch, 
@Nullable RexNode offset) {
+    return new PinotLogicalEnrichedJoin(getCluster(), getTraitSet(), 
getHints(), left, right,
+        getCondition(), getVariablesSet(), getJoinType(),
+        _filterProjectRexNodes, _outputRowType, _projectVariableSet,
+        fetch, offset);
+  }
+
+  @Override
+  protected RelDataType deriveRowType() {
+    return checkNotNull(_outputRowType);
+  }
+
+  public final RelDataType getJoinRowType() {
+    return SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
+        right.getRowType(), joinType, getCluster().getTypeFactory(), null,
+        getSystemFieldList());
+  }
+
+  public List<FilterProjectRexNode> getFilterProjectRexNodes() {
+    return _filterProjectRexNodes;
+  }
+
+  public List<RexNode> getProjects() {
+    return _squashedProjects == null ? Collections.emptyList() : 
_squashedProjects;
+  }
+
+  /** combine all projects in _filterProjectRexNodes into a single project */
+  @Nullable
+  private List<RexNode> squashProjects() {
+    List<RexNode> prevProject = null;
+    for (FilterProjectRexNode node : _filterProjectRexNodes) {
+      if (node.getType() == FilterProjectRexNodeType.FILTER) {
+        continue;
+      }
+      List<RexNode> project = node.getProjectAndResultRowType().getProject();
+      if (prevProject == null) {
+        prevProject = project;
+        continue;
+      }
+      // combine project
+      prevProject = combineProjects(project, prevProject);
+    }
+    return prevProject;
+  }
+
+  /** adopted from @link{RelOptUtil.pushPastProject} */
+  private List<RexNode> combineProjects(List<RexNode> upper, List<RexNode> 
lower) {

Review Comment:
   nit: this can be static



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -591,6 +593,10 @@ private static HepProgram getTraitProgram(@Nullable 
WorkerManager workerManager,
           hepProgramBuilder.addRuleInstance(relOptRule);
         }
       }
+      if 
(!isRuleSkipped(CommonConstants.Broker.PlannerRuleNames.JOIN_TO_ENRICHED_JOIN, 
Set.of(), useRuleSet)) {
+        // push filter and project above join to enrichedJoin, does not work 
with physical optimizer

Review Comment:
   Why doesn't it work with physical optimizer?



##########
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEnrichedJoinRule.java:
##########
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
+import org.apache.pinot.calcite.rel.logical.PinotLogicalEnrichedJoin;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.immutables.value.Value;
+
+
+/**
+ * Rules for fusing filter, projection, and limit operators into join and 
enriched joins.
+ * This collection of rules will be fired bottom-up and fuse operators into 
the enriched join greedily.
+ */
[email protected]
+public class PinotEnrichedJoinRule {
+
+  private PinotEnrichedJoinRule() {
+  }
+
+  /**
+   * Rule that fuses a LogicalProject into a PinotLogicalEnrichedJoin
+   */
+  public static class ProjectEnrichedJoin extends 
RelRule<ProjectEnrichedJoin.ProjectEnrichedJoinConfig> {
+
+    @Value.Immutable
+    public interface ProjectEnrichedJoinConfig extends RelRule.Config {
+      ProjectEnrichedJoinConfig DEFAULT =
+          ImmutablePinotEnrichedJoinRule.ProjectEnrichedJoinConfig.builder()
+              .operandSupplier(b0 ->
+                  b0.operand(LogicalProject.class).oneInput(b1 ->
+                      b1.operand(PinotLogicalEnrichedJoin.class).anyInputs()
+                  )
+              ).build();
+
+      @Override
+      default ProjectEnrichedJoin toRule() {
+        return new ProjectEnrichedJoin(this);
+      }
+    }
+
+    private ProjectEnrichedJoin(ProjectEnrichedJoinConfig config) {
+      super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      LogicalProject project = call.rel(0);
+      PinotLogicalEnrichedJoin enrichedJoin = call.rel(1);
+
+      // Add projection to the enriched join
+      PinotLogicalEnrichedJoin newEnrichedJoin = enrichedJoin.withNewProject(
+          new 
PinotLogicalEnrichedJoin.FilterProjectRexNode(project.getProjects(), 
project.getRowType()),
+          project.getRowType(),
+          project.getVariablesSet());
+
+      call.transformTo(newEnrichedJoin);
+    }
+  }
+
+  /**
+   * Rule that fuses a LogicalFilter into a PinotLogicalEnrichedJoin
+   */
+  public static class FilterEnrichedJoin extends 
RelRule<FilterEnrichedJoin.FilterEnrichedJoinConfig> {
+
+    @Value.Immutable
+    public interface FilterEnrichedJoinConfig extends RelRule.Config {
+      FilterEnrichedJoinConfig DEFAULT =
+          ImmutablePinotEnrichedJoinRule.FilterEnrichedJoinConfig.builder()
+              .operandSupplier(b0 ->
+                  b0.operand(LogicalFilter.class).oneInput(b1 ->
+                      b1.operand(PinotLogicalEnrichedJoin.class).anyInputs()
+                  )
+              ).build();
+
+      @Override
+      default FilterEnrichedJoin toRule() {
+        return new FilterEnrichedJoin(this);
+      }
+    }
+
+    private FilterEnrichedJoin(FilterEnrichedJoinConfig config) {
+      super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      LogicalFilter filter = call.rel(0);
+      PinotLogicalEnrichedJoin enrichedJoin = call.rel(1);
+
+      // Add filter to the enriched join
+      PinotLogicalEnrichedJoin newEnrichedJoin = enrichedJoin.withNewFilter(
+          new 
PinotLogicalEnrichedJoin.FilterProjectRexNode(filter.getCondition()));
+
+      call.transformTo(newEnrichedJoin);
+    }
+  }
+
+  /**
+   * Rule that fuses a LogicalSort into a PinotLogicalEnrichedJoin
+   */
+  public static class SortEnrichedJoin extends 
RelRule<SortEnrichedJoin.SortEnrichedJoinConfig> {
+
+    @Value.Immutable
+    public interface SortEnrichedJoinConfig extends RelRule.Config {
+      SortEnrichedJoinConfig DEFAULT = 
ImmutablePinotEnrichedJoinRule.SortEnrichedJoinConfig.builder()
+          .operandSupplier(b0 ->
+              b0.operand(LogicalSort.class).oneInput(b1 ->
+                  b1.operand(PinotLogicalEnrichedJoin.class).anyInputs()
+              )
+          ).build();
+
+      @Override
+      default SortEnrichedJoin toRule() {
+        return new SortEnrichedJoin(this);
+      }
+    }
+
+    private SortEnrichedJoin(SortEnrichedJoinConfig config) {
+      super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      LogicalSort sort = call.rel(0);
+      PinotLogicalEnrichedJoin enrichedJoin = call.rel(1);
+
+      // If enriched join already had a sort operator merged, return
+      if (enrichedJoin.getOffset() != null || enrichedJoin.getFetch() != null) 
{
+        return;
+      }
+
+      // Enriched join does not support sort collation, only fetch and offset
+      if (sort.getCollation() != null && 
!sort.getCollation().equals(RelCollations.EMPTY)) {
+        return;
+      }
+
+      // Add sort limit to the enriched join
+      PinotLogicalEnrichedJoin newEnrichedJoin = 
enrichedJoin.withNewFetchOffset(sort.fetch, sort.offset);
+      call.transformTo(newEnrichedJoin);
+    }
+  }
+
+  /**
+   * Rule that converts LogicalProject + LogicalJoin into 
PinotLogicalEnrichedJoin
+   */
+  public static class ProjectJoin extends 
RelRule<ProjectJoin.ProjectJoinConfig> {

Review Comment:
   Do we have tests verifying this is applied _after_ project pushdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to