gortiz commented on code in PR #18237:
URL: https://github.com/apache/pinot/pull/18237#discussion_r3131265831


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -551,6 +554,7 @@ private void 
assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata
       if (!routingTable.getUnavailableSegments().isEmpty()) {
         metadata.addUnavailableSegments(tableName, 
routingTable.getUnavailableSegments());
       }
+      
context.addNumSegmentsPrunedByBroker(routingTable.getNumPrunedSegments());

Review Comment:
   `extractRoutingQuery`  is nullable. Shouldn't this produce an NPE whenever 
this is called with `useBrokerPruning = false`? The same happens with filter



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/PlanNodeRoutingQueryBuilder.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.logical.LeafStageToPinotQuery;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+
+
+/**
+ * Converts an MSE leaf stage {@link PlanNode} tree to a {@link PinotQuery} 
for routing purposes. Used by the broker
+ * pruning path in {@link WorkerManager} to build a filter-bearing routing 
query that enables segment pruning at the
+ * broker.
+ */
+public class PlanNodeRoutingQueryBuilder {
+  private PlanNodeRoutingQueryBuilder() {
+  }
+
+  /**
+   * Converts a PlanNode leaf stage root to a {@link PinotQuery} for routing 
purposes. Only handles Project, Filter
+   * and TableScan nodes — other node types in the chain are silently skipped. 
Callers should expect this method
+   * to throw on malformed trees (e.g., missing TableScanNode, multi-input 
nodes) and handle failures gracefully.
+   *
+   * @param tableName the table name (with or without type suffix). Passed 
explicitly because PlanNode trees
+   *                  don't carry the resolved table name.
+   * @param leafStageRoot the root of the leaf stage
+   * @param skipFilter whether to skip the filter in the query
+   * @return a {@link PinotQuery} representing the leaf stage
+   */
+  public static PinotQuery createPinotQueryForRouting(String tableName, 
PlanNode leafStageRoot, boolean skipFilter) {
+    List<PlanNode> bottomToTopNodes = new ArrayList<>();
+    accumulateBottomToTop(leafStageRoot, bottomToTopNodes);
+    Preconditions.checkState(!bottomToTopNodes.isEmpty() && 
bottomToTopNodes.get(0) instanceof TableScanNode,
+        "Could not find table scan");
+    TableScanNode tableScan = (TableScanNode) bottomToTopNodes.get(0);
+    PinotQuery pinotQuery = initializePinotQueryForTableScan(tableName, 
tableScan);
+    for (PlanNode parentNode : bottomToTopNodes) {
+      if (parentNode instanceof FilterNode) {
+        if (!skipFilter) {
+          handleFilter((FilterNode) parentNode, pinotQuery);
+        }
+      } else if (parentNode instanceof ProjectNode) {
+        handleProject((ProjectNode) parentNode, pinotQuery);
+      }
+    }
+    return pinotQuery;
+  }
+
+  private static void accumulateBottomToTop(PlanNode root, List<PlanNode> 
parentNodes) {
+    Preconditions.checkState(root.getInputs().size() <= 1,
+        "Leaf stage nodes should have at most one input, found: %s", 
root.getInputs().size());
+    for (PlanNode input : root.getInputs()) {
+      accumulateBottomToTop(input, parentNodes);
+    }
+    parentNodes.add(root);
+  }
+
+  private static PinotQuery initializePinotQueryForTableScan(String tableName, 
TableScanNode tableScan) {
+    PinotQuery pinotQuery = new PinotQuery();
+    pinotQuery.setDataSource(new DataSource());
+    pinotQuery.getDataSource().setTableName(tableName);
+    pinotQuery.setSelectList(tableScan.getColumns().stream().map(
+        RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
+    return pinotQuery;
+  }
+
+  private static void handleProject(ProjectNode project, PinotQuery 
pinotQuery) {
+    if (project != null) {

Review Comment:
   nit: this check is not needed (right now this method is only called when 
project is not null). But in case you want to support null projects, we should 
mark the argument as `@Nullable`



##########
pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java:
##########
@@ -123,9 +127,215 @@ public void 
testSingletonWorkerWithEmptyTableAndUseLeafServerEnabled() {
     String query = "SET useLeafServerForIntermediateStage=true; SELECT * FROM 
emptyTable LIMIT 10";
 
     // This should not throw "bound must be positive" error anymore
-    @SuppressWarnings("deprecation")
-    DispatchableSubPlan dispatchableSubPlan = 
queryEnvironment.planQuery(query);
-    assertNotNull(dispatchableSubPlan);
+    try (QueryEnvironment.CompiledQuery compiledQuery = 
queryEnvironment.compile(query)) {
+      DispatchableSubPlan dispatchableSubPlan = 
compiledQuery.planQuery(0).getQueryPlan();
+      assertNotNull(dispatchableSubPlan);
+    }
+  }
+
+  @Test
+  public void testBrokerPruningUsesFilteredRoutingQueryOnThisPath() {
+    Schema schema = getSchemaBuilder("testTable").build();
+    ServerInstance server = getServerInstance("localhost", 1);
+    Map<String, ServerInstance> serverInstanceMap = 
Map.of(server.getInstanceId(), server);
+    RoutingTable routingTable = new RoutingTable(Map.of(server, new 
SegmentsToQuery(List.of("segment1"), List.of())),
+        List.of(), 0);
+    CapturingRoutingManager routingManager = new 
CapturingRoutingManager(serverInstanceMap,
+        Map.of("testTable_OFFLINE", routingTable));
+
+    Map<String, String> tableNameMap = new HashMap<>();
+    tableNameMap.put("testTable_OFFLINE", "testTable_OFFLINE");
+    tableNameMap.put("testTable", "testTable");
+
+    TableCache tableCache = mock(TableCache.class);
+    when(tableCache.getTableNameMap()).thenReturn(tableNameMap);
+    when(tableCache.getActualTableName(anyString())).thenAnswer(inv -> 
tableNameMap.get(inv.getArgument(0)));
+    when(tableCache.getSchema(anyString())).thenReturn(schema);
+    when(tableCache.getTableConfig("testTable_OFFLINE"))
+        .thenReturn(mock(org.apache.pinot.spi.config.table.TableConfig.class));

Review Comment:
   nit: You are referencing `TableConfig` with the full path several times 
without importing it. Usually we prefer to use import in these cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to