This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 37fe83751c8 Fix bug with 'useLeafServerForIntermediateStage' when used 
with empty tables (#17634)
37fe83751c8 is described below

commit 37fe83751c8a43897d81a035a93e3b1772d12aed
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Feb 4 16:07:29 2026 -0800

    Fix bug with 'useLeafServerForIntermediateStage' when used with empty 
tables (#17634)
---
 .../planner/physical/DispatchablePlanContext.java  |   3 +
 .../apache/pinot/query/routing/WorkerManager.java  |  18 +-
 .../pinot/query/routing/WorkerManagerTest.java     | 195 +++++++++++++++++++++
 3 files changed, 214 insertions(+), 2 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 61f2bcd9ed6..fe89e51a151 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.calcite.runtime.PairList;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.query.context.PlannerContext;
@@ -52,7 +53,9 @@ public class DispatchablePlanContext {
   private final PairList<Integer, String> _resultFields;
   private final Set<String> _tableNames;
 
+  @Nullable
   private final Set<String> _nonLookupTables;
+  @Nullable
   private final Set<QueryServerInstance> _leafServerInstances;
 
   private final Map<Integer, DispatchablePlanMetadata> 
_dispatchablePlanMetadataMap = new HashMap<>();
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 34867ef2aff..7446bb2e88f 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -361,8 +361,22 @@ public class WorkerManager {
     List<QueryServerInstance> candidateServers;
     if (context.isUseLeafServerForIntermediateStage()) {
       Set<QueryServerInstance> leafServerInstances = 
context.getLeafServerInstances();
-      assert !leafServerInstances.isEmpty();
-      candidateServers = new ArrayList<>(leafServerInstances);
+      if (leafServerInstances.isEmpty()) {
+        // Fall back to use all enabled servers if no leaf server is found 
(e.g., when querying an empty table).
+        LOGGER.warn("[RequestId: {}] No leaf server found with 
useLeafServerForIntermediateStage enabled, "
+            + "falling back to all enabled servers", context.getRequestId());
+        Map<String, ServerInstance> enabledServerInstanceMap = 
_routingManager.getEnabledServerInstanceMap();
+        candidateServers = new ArrayList<>(enabledServerInstanceMap.size());
+        for (ServerInstance serverInstance : 
enabledServerInstanceMap.values()) {
+          candidateServers.add(new QueryServerInstance(serverInstance));
+        }
+        if (candidateServers.isEmpty()) {
+          LOGGER.error("[RequestId: {}] No server instance found for 
intermediate stage", context.getRequestId());
+          throw new IllegalStateException("No server instance found for 
intermediate stage");
+        }
+      } else {
+        candidateServers = new ArrayList<>(leafServerInstances);
+      }
     } else {
       candidateServers = getCandidateServersPerTables(context);
     }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java
new file mode 100644
index 00000000000..45c072a1ada
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
+import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Tests for {@link WorkerManager}.
+ */
+public class WorkerManagerTest {
+
+  private static Schema.SchemaBuilder getSchemaBuilder(String schemaName) {
+    return new Schema.SchemaBuilder()
+        .addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
+        .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
+        .addMetric("col3", FieldSpec.DataType.INT, 0)
+        .setSchemaName(schemaName);
+  }
+
+  private static ServerInstance getServerInstance(String hostname, int port) {
+    String server = String.format("%s%s_%d", 
CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE, hostname, port);
+    InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(server);
+    ZNRecord znRecord = instanceConfig.getRecord();
+    Map<String, String> simpleFields = znRecord.getSimpleFields();
+    simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY, 
String.valueOf(port));
+    
simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY,
 String.valueOf(port));
+    
simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
 String.valueOf(port));
+    return new ServerInstance(instanceConfig);
+  }
+
+  /**
+   * Tests that when useLeafServerForIntermediateStage is enabled and querying 
an empty table
+   * (which results in no leaf servers), the query planner falls back to using 
all enabled servers
+   * instead of failing.
+   *
+   * This test simulates the scenario where a table exists with routing but 
has no segments,
+   * resulting in an empty RoutingTable (no server instances with segments).
+   */
+  @Test
+  public void testSingletonWorkerWithEmptyTableAndUseLeafServerEnabled() {
+    Schema emptyTableSchema = getSchemaBuilder("emptyTable").build();
+
+    // Create server instances
+    ServerInstance server1 = getServerInstance("localhost", 1);
+    ServerInstance server2 = getServerInstance("localhost", 2);
+    Map<String, ServerInstance> serverInstanceMap = new HashMap<>();
+    serverInstanceMap.put(server1.getInstanceId(), server1);
+    serverInstanceMap.put(server2.getInstanceId(), server2);
+
+    // Create a routing table with no segments (empty table scenario)
+    RoutingTable emptyRoutingTable = new RoutingTable(Collections.emptyMap(), 
List.of(), 0);
+
+    // Create mock routing manager
+    RoutingManager routingManager = new 
EmptyTableRoutingManager(serverInstanceMap, emptyRoutingTable);
+
+    // Create mock table cache
+    Map<String, String> tableNameMap = new HashMap<>();
+    tableNameMap.put("emptyTable_OFFLINE", "emptyTable_OFFLINE");
+    tableNameMap.put("emptyTable", "emptyTable");
+
+    TableCache tableCache = mock(TableCache.class);
+    when(tableCache.getTableNameMap()).thenReturn(tableNameMap);
+    when(tableCache.getActualTableName(anyString())).thenAnswer(inv -> 
tableNameMap.get(inv.getArgument(0)));
+    when(tableCache.getSchema(anyString())).thenReturn(emptyTableSchema);
+    when(tableCache.getTableConfig("emptyTable_OFFLINE"))
+        .thenReturn(mock(org.apache.pinot.spi.config.table.TableConfig.class));
+
+    WorkerManager workerManager = new WorkerManager("Broker_localhost", 
"localhost", 3, routingManager);
+    QueryEnvironment queryEnvironment = new 
QueryEnvironment(CommonConstants.DEFAULT_DATABASE, tableCache,
+        workerManager);
+
+    // This query requires a singleton worker (due to LIMIT) and uses 
useLeafServerForIntermediateStage
+    // When querying an empty table, there are no leaf servers, so we need to 
fall back to enabled servers
+    String query = "SET useLeafServerForIntermediateStage=true; SELECT * FROM 
emptyTable LIMIT 10";
+
+    // This should not throw "bound must be positive" error anymore
+    @SuppressWarnings("deprecation")
+    DispatchableSubPlan dispatchableSubPlan = 
queryEnvironment.planQuery(query);
+    assertNotNull(dispatchableSubPlan);
+  }
+
+  /**
+   * A custom RoutingManager implementation that simulates a table with 
routing but no segments.
+   * This is used to test the empty leaf server fallback logic.
+   */
+  private static class EmptyTableRoutingManager implements RoutingManager {
+    private final Map<String, ServerInstance> _serverInstanceMap;
+    private final RoutingTable _emptyRoutingTable;
+
+    public EmptyTableRoutingManager(Map<String, ServerInstance> 
serverInstanceMap, RoutingTable emptyRoutingTable) {
+      _serverInstanceMap = serverInstanceMap;
+      _emptyRoutingTable = emptyRoutingTable;
+    }
+
+    @Override
+    public Map<String, ServerInstance> getEnabledServerInstanceMap() {
+      return _serverInstanceMap;
+    }
+
+    @Nullable
+    @Override
+    public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long 
requestId) {
+      return _emptyRoutingTable;
+    }
+
+    @Nullable
+    @Override
+    public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String 
tableNameWithType, long requestId) {
+      return _emptyRoutingTable;
+    }
+
+    @Nullable
+    @Override
+    public List<String> getSegments(BrokerRequest brokerRequest) {
+      return List.of();
+    }
+
+    @Override
+    public boolean routingExists(String tableNameWithType) {
+      return true;
+    }
+
+    @Nullable
+    @Override
+    public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public TablePartitionReplicatedServersInfo 
getTablePartitionReplicatedServersInfo(String tableNameWithType) {
+      return null;
+    }
+
+    @Override
+    public Set<String> getServingInstances(String tableNameWithType) {
+      return new HashSet<>(_serverInstanceMap.keySet());
+    }
+
+    @Override
+    public boolean isTableDisabled(String tableNameWithType) {
+      return false;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to