This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 180823d674 [multistage] Add Support for Values in Physical Optimizer
(#16221)
180823d674 is described below
commit 180823d6743293b20d298eff7b4a0e7b2085faae
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Jul 1 19:53:28 2025 -0500
[multistage] Add Support for Values in Physical Optimizer (#16221)
---
.../query/context/PhysicalPlannerContext.java | 33 +++++++-
.../v2/opt/rules/LiteModeWorkerAssignmentRule.java | 42 +---------
.../v2/opt/rules/WorkerExchangeAssignmentRule.java | 9 +-
.../query/context/PhysicalPlannerContextTest.java | 98 ++++++++++++++++++++++
.../rules/LiteModeWorkerAssignmentRuleTest.java | 24 ------
.../resources/queries/PhysicalOptimizerPlans.json | 57 +++++++++++++
.../query/service/dispatch/QueryDispatcher.java | 2 +-
.../src/test/resources/queries/BasicQuery.json | 4 +
8 files changed, 201 insertions(+), 68 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index b6c9d2e2e8..e8e46b3940 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -18,9 +18,13 @@
*/
package org.apache.pinot.query.context;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.RoutingManager;
@@ -80,9 +84,16 @@ public class PhysicalPlannerContext {
_liteModeServerStageLimit =
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
}
+ public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
+ String instanceId, Map<String, String> queryOptions) {
+ this(routingManager, hostName, port, requestId, instanceId, queryOptions,
+ CommonConstants.Broker.DEFAULT_USE_LITE_MODE,
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
+ CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT);
+ }
+
public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions, boolean
defaultUseLiteMode, boolean defaultRunInBroker,
- boolean defaultUseBrokerPruning, int defaultLiteModeServerStageLimit) {
+ boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit) {
_routingManager = routingManager;
_hostName = hostName;
_port = port;
@@ -93,7 +104,7 @@ public class PhysicalPlannerContext {
_runInBroker = QueryOptionsUtils.isRunInBroker(_queryOptions,
defaultRunInBroker);
_useBrokerPruning = QueryOptionsUtils.isUseBrokerPruning(_queryOptions,
defaultUseBrokerPruning);
_liteModeServerStageLimit =
QueryOptionsUtils.getLiteModeServerStageLimit(_queryOptions,
- defaultLiteModeServerStageLimit);
+ defaultLiteModeLeafStageLimit);
_instanceIdToQueryServerInstance.put(instanceId,
getBrokerQueryServerInstance());
}
@@ -146,6 +157,24 @@ public class PhysicalPlannerContext {
return _liteModeServerStageLimit;
}
+ /**
+ * Gets a random instance id from the registered instances in the context.
+ * <p>
+ * <b>Important:</b> This method will always return a server instanceId,
unless no server has yet been registered
+ * with the context, which could happen for queries which don't consist of
any table-scans.
+ * </p>
+ */
+ public String getRandomInstanceId() {
+ Preconditions.checkState(!_instanceIdToQueryServerInstance.isEmpty(), "No
instances present in context");
+ if (_instanceIdToQueryServerInstance.size() == 1) {
+ return _instanceIdToQueryServerInstance.keySet().iterator().next();
+ }
+ int numCandidates = _instanceIdToQueryServerInstance.size() - 1;
+ Random random = ThreadLocalRandom.current();
+ return
_instanceIdToQueryServerInstance.keySet().stream().filter(instanceId ->
!_instanceId.equals(instanceId))
+ .collect(Collectors.toList()).get(numCandidates == 1 ? 0 :
random.nextInt(numCandidates - 1));
+ }
+
private QueryServerInstance getBrokerQueryServerInstance() {
return new QueryServerInstance(_instanceId, _hostName, _port, _port);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
index 2ebaa7a837..e3cf4379bb 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
@@ -18,15 +18,10 @@
*/
package org.apache.pinot.query.planner.physical.v2.opt.rules;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
@@ -50,7 +45,6 @@ import
org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
* plan nodes.
*/
public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer {
- private static final Random RANDOM = new Random();
private final PhysicalPlannerContext _context;
private final boolean _runInBroker;
@@ -61,13 +55,11 @@ public class LiteModeWorkerAssignmentRule implements
PRelNodeTransformer {
@Override
public PRelNode execute(PRelNode currentNode) {
- Set<String> workerSet = new HashSet<>();
List<String> workers;
if (_runInBroker) {
- workers = List.of(String.format("0@%s", _context.getInstanceId()));
+ workers = List.of("0@" + _context.getInstanceId());
} else {
- accumulateWorkers(currentNode, workerSet);
- workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
+ workers = List.of("0@" + _context.getRandomInstanceId());
}
return addExchangeAndWorkers(currentNode, null, workers);
}
@@ -98,36 +90,6 @@ public class LiteModeWorkerAssignmentRule implements
PRelNodeTransformer {
return currentNode;
}
- /**
- * Stores workers assigned to the leaf stage nodes into the provided Set.
Note that each worker has an integer prefix
- * which denotes the "workerId". We remove that prefix before storing them
in the set.
- */
- @VisibleForTesting
- static void accumulateWorkers(PRelNode currentNode, Set<String> workerSink) {
- if (currentNode.isLeafStage()) {
-
workerSink.addAll(currentNode.getPinotDataDistributionOrThrow().getWorkers().stream()
-
.map(LiteModeWorkerAssignmentRule::stripIdPrefixFromWorker).collect(Collectors.toList()));
- return;
- }
- for (PRelNode input : currentNode.getPRelInputs()) {
- accumulateWorkers(input, workerSink);
- }
- }
-
- /**
- * Samples a worker from the given list.
- */
- @VisibleForTesting
- static String sampleWorker(List<String> instanceIds) {
- Preconditions.checkState(!instanceIds.isEmpty(), "No workers in leaf
stage");
- return String.format("0@%s",
instanceIds.get(RANDOM.nextInt(instanceIds.size())));
- }
-
- @VisibleForTesting
- static String stripIdPrefixFromWorker(String worker) {
- return worker.split("@")[1];
- }
-
/**
* Infers Exchange to be added on top of the leaf stage.
*/
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
index 0ec60011df..8bffea3093 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Values;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
@@ -394,12 +395,18 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
* Computes the PinotDataDistribution of the given node from the input node.
This assumes that all traits of the
* input node are already satisfied.
*/
- private static PinotDataDistribution computeCurrentNodeDistribution(PRelNode
currentNode, @Nullable PRelNode parent) {
+ private PinotDataDistribution computeCurrentNodeDistribution(PRelNode
currentNode, @Nullable PRelNode parent) {
if (currentNode.getPinotDataDistribution() != null) {
Preconditions.checkState(isLeafStageBoundary(currentNode, parent),
"current node should not have assigned data distribution unless it's
a boundary");
return currentNode.getPinotDataDistributionOrThrow();
}
+ if (currentNode.getPRelInputs().isEmpty()) {
+ Preconditions.checkState(currentNode.unwrap() instanceof Values,
"Expected Values node. Found: %s",
+ currentNode.unwrap());
+ List<String> workers = List.of(String.format("0@%s",
_physicalPlannerContext.getRandomInstanceId()));
+ return new PinotDataDistribution(RelDistribution.Type.SINGLETON,
workers, workers.hashCode(), null, null);
+ }
PinotDataDistribution inputDistribution =
currentNode.getPRelInput(0).getPinotDataDistributionOrThrow();
PinotDataDistribution newDistribution =
inputDistribution.apply(DistMappingGenerator.compute(
currentNode.unwrap().getInput(0), currentNode.unwrap(), null),
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/context/PhysicalPlannerContextTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/context/PhysicalPlannerContextTest.java
new file mode 100644
index 0000000000..427ce8ade1
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/context/PhysicalPlannerContextTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.context;
+
+import java.util.Map;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class PhysicalPlannerContextTest {
+
+ @Test
+ public void testGetRandomInstanceIdWithNoInstances() {
+ // Test case: No instances present in context (should throw
IllegalStateException)
+ PhysicalPlannerContext context = createPhysicalPlannerContext();
+
+ // Clear the instances map to simulate no instances
+ context.getInstanceIdToQueryServerInstance().clear();
+
+ try {
+ context.getRandomInstanceId();
+ Assert.fail("Expected IllegalStateException when no instances are
present");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "No instances present in context");
+ }
+ }
+
+ @Test
+ public void testGetRandomInstanceIdWithSingleInstance() {
+ // Test case: Only one instance present (should return that instance)
+ PhysicalPlannerContext context = createPhysicalPlannerContext();
+
+ // The constructor automatically adds the broker instance, so we should
have exactly one
+ String randomInstanceId = context.getRandomInstanceId();
+ Assert.assertEquals(randomInstanceId, "broker_instance_1");
+ }
+
+ @Test
+ public void testGetRandomInstanceIdWithMultipleInstances() {
+ // Test case: Multiple instances present (should return one that's not the
current instance)
+ PhysicalPlannerContext context = createPhysicalPlannerContext();
+
+ // Add additional server instances
+ QueryServerInstance serverInstance2 = new
QueryServerInstance("server_instance_2", "host2", 8081, 8081);
+ QueryServerInstance serverInstance3 = new
QueryServerInstance("server_instance_3", "host3", 8082, 8082);
+
+ context.getInstanceIdToQueryServerInstance().put("server_instance_2",
serverInstance2);
+ context.getInstanceIdToQueryServerInstance().put("server_instance_3",
serverInstance3);
+
+ // Call getRandomInstanceId multiple times to verify it returns different
server instances
+ // but never the broker instance
+ for (int i = 0; i < 10; i++) {
+ String randomInstanceId = context.getRandomInstanceId();
+ Assert.assertNotEquals(randomInstanceId, "broker_instance_1",
+ "Random instance should not be the current broker instance");
+ Assert.assertTrue(randomInstanceId.equals("server_instance_2") ||
randomInstanceId.equals("server_instance_3"),
+ "Random instance should be one of the server instances");
+ }
+ }
+
+ @Test
+ public void testGetRandomInstanceIdWithTwoInstances() {
+ // Test case: Two instances (broker + one server) - should return the
server
+ PhysicalPlannerContext context = createPhysicalPlannerContext();
+
+ // Add one server instance
+ QueryServerInstance serverInstance = new
QueryServerInstance("server_instance_1", "host1", 8081, 8081);
+ context.getInstanceIdToQueryServerInstance().put("server_instance_1",
serverInstance);
+
+ String randomInstanceId = context.getRandomInstanceId();
+ Assert.assertEquals(randomInstanceId, "server_instance_1",
+ "With two instances, should return the non-broker instance");
+ }
+
+ private PhysicalPlannerContext createPhysicalPlannerContext() {
+ RoutingManager mockRoutingManager = mock(RoutingManager.class);
+ return new PhysicalPlannerContext(mockRoutingManager, "localhost", 8080,
12345L, "broker_instance_1", Map.of());
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java
index dd70f415d8..7557d5bb07 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java
@@ -18,39 +18,15 @@
*/
package org.apache.pinot.query.planner.physical.v2.opt.rules;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
import org.mockito.Mockito;
-import org.testng.annotations.Test;
import static org.mockito.Mockito.doReturn;
-import static org.testng.Assert.*;
public class LiteModeWorkerAssignmentRuleTest {
- @Test
- public void testAccumulateWorkers() {
- PRelNode leafOne = create(List.of(), true, List.of("0@server-1",
"1@server-2"));
- PRelNode leafTwo = create(List.of(), true, List.of("0@server-2",
"1@server-1"));
- PRelNode intermediateNode = create(List.of(leafOne, leafTwo), false,
List.of("0@server-3", "1@server-4"));
- Set<String> workers = new HashSet<>();
- LiteModeWorkerAssignmentRule.accumulateWorkers(intermediateNode, workers);
- assertEquals(workers, Set.of("server-1", "server-2"));
- }
-
- @Test
- public void testSampleWorker() {
- List<String> workers = List.of("worker-0", "worker-1", "worker-2");
- Set<String> selectionCandidates = Set.of("0@worker-0", "0@worker-1",
"0@worker-2");
- Set<String> selectedWorkers = new HashSet<>();
- for (int iteration = 0; iteration < 1000; iteration++) {
- selectedWorkers.add(LiteModeWorkerAssignmentRule.sampleWorker(workers));
- }
- assertEquals(selectedWorkers, selectionCandidates);
- }
private PRelNode create(List<PRelNode> inputs, boolean isLeafStage,
List<String> workers) {
// Setup mock pinot data distribution.
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index 96c95f4498..bd5137c837 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -1,4 +1,61 @@
{
+ "physical_opt_constant_queries": {
+ "queries": [
+ {
+ "description": "Select 1",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 1",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalValues(tuples=[[{ 1 }]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Constant only join query",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR WITH tmp1(id,
name) AS (VALUES(1, 'foo')), tmp2(user_id, nm2) AS (VALUES(1, 'bar')) SELECT *
FROM tmp1 JOIN tmp2 ON 1=1",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalJoin(condition=[true], joinType=[inner])",
+ "\n PhysicalValues(tuples=[[{ 1, _UTF-8'foo' }]])",
+ "\n PhysicalValues(tuples=[[{ 1, _UTF-8'bar' }]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Query that gets optimized to a Values node",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM
a WHERE col1 IS NULL LIMIT 1",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalSort(fetch=[1])",
+ "\n PhysicalValues(tuples=[[]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Constant only join query",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT
COUNT(*) FROM a WHERE col1 IN (WITH tmp1(id, name) AS (VALUES(1, 'foo')),
tmp2(user_id, nm) AS (VALUES(2, 'bar')) SELECT A.name FROM tmp1 AS A JOIN tmp2
AS B ON A.id = B.user_id)",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{}], agg#0=[COUNT($0)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{}], agg#0=[COUNT()],
aggType=[LEAF])",
+ "\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalProject(col1=[$0])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalProject(name=[$1])",
+ "\n PhysicalJoin(condition=[=($0, $2)],
joinType=[inner])",
+ "\n PhysicalValues(tuples=[[]])",
+ "\n PhysicalProject(EXPR$0=[$0])",
+ "\n PhysicalValues(tuples=[[]])",
+ "\n"
+ ]
+ }
+ ]
+ },
"physical_opt_chained_subqueries": {
"queries": [
{
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index d7ef6790d2..68cac7e9ce 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -320,7 +320,7 @@ public class QueryDispatcher {
serializePlanFragments(stagePlans, serverInstancesOut, deadline);
if (serverInstancesOut.isEmpty()) {
- throw new RuntimeException("No server instances to dispatch query to");
+ return;
}
Map<String, String> requestMetadata =
diff --git a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
index eb8e8d53d6..71f239db5e 100644
--- a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
+++ b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
@@ -20,6 +20,10 @@
{
"description": "basic test with literal",
"sql": "SELECT 1 AS int, CAST(2 AS DOUBLE) AS double"
+ },
+ {
+ "description": "select 1 but alias to a reserved column name",
+ "sql": "SELECT 1 as \"timestamp\""
}
]
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]