This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e6f77dfd37 [multistage] add support for stage parallelism (#10166)
e6f77dfd37 is described below
commit e6f77dfd3747d385fd2e13103c2fd893522f5b8a
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Jan 26 12:11:13 2023 -0800
[multistage] add support for stage parallelism (#10166)
* [multistage] add support for stage parallelism
* remove unnecessary change
---
.../pinot/query/planner/logical/StagePlanner.java | 2 +-
.../apache/pinot/query/routing/WorkerManager.java | 16 ++++--
.../runtime/operator/MailboxSendOperator.java | 5 ++
.../runtime/plan/serde/QueryPlanSerDeUtils.java | 23 ++++++--
.../pinot/query/runtime/QueryRunnerTestBase.java | 5 ++
.../plan/serde/QueryPlanSerDeUtilsTest.java | 66 ++++++++++++++++++++++
.../src/test/resources/queries/Parallelism.json | 47 +++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
8 files changed, 153 insertions(+), 12 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 0af5497ee2..1081459dd7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -85,7 +85,7 @@ public class StagePlanner {
// assign workers to each stage.
for (Map.Entry<Integer, StageMetadata> e :
queryPlan.getStageMetadataMap().entrySet()) {
- _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId);
+ _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId,
_plannerContext.getOptions());
}
// Run physical optimizations
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 0e3750dc5d..60930b6dce 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -59,7 +59,8 @@ public class WorkerManager {
_routingManager = routingManager;
}
- public void assignWorkerToStage(int stageId, StageMetadata stageMetadata,
long requestId) {
+ public void assignWorkerToStage(int stageId, StageMetadata stageMetadata,
long requestId,
+ Map<String, String> options) {
List<String> scannedTables = stageMetadata.getScannedTables();
if (scannedTables.size() == 1) {
// table scan stage, need to attach server as well as segment info for
each physical table type.
@@ -97,7 +98,7 @@ public class WorkerManager {
stageMetadata.setServerInstances(new ArrayList<>(
serverInstanceToSegmentsMap.keySet()
.stream()
- .map(server -> new VirtualServer(server, 0)) // for now, only
use single virtual server
+ .map(server -> new VirtualServer(server, 0)) // the leaf stage
only has one server, so always use 0 here
.collect(Collectors.toList())));
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
} else if (PlannerUtils.isRootStage(stageId)) {
@@ -106,11 +107,14 @@ public class WorkerManager {
stageMetadata.setServerInstances(Lists.newArrayList(
new VirtualServer(new WorkerInstance(_hostName, _port, _port, _port,
_port), 0)));
} else {
-
stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values()));
+
stageMetadata.setServerInstances(assignServers(_routingManager.getEnabledServerInstanceMap().values(),
options));
}
}
- private static List<VirtualServer> filterServers(Collection<ServerInstance>
servers) {
+ private static List<VirtualServer> assignServers(Collection<ServerInstance>
servers, Map<String, String> options) {
+ int stageParallelism = Integer.parseInt(
+
options.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.STAGE_PARALLELISM,
"1"));
+
List<VirtualServer> serverInstances = new ArrayList<>();
for (ServerInstance server : servers) {
String hostname = server.getHostname();
@@ -118,7 +122,9 @@ public class WorkerManager {
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE)
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
- serverInstances.add(new VirtualServer(server, 0));
+ for (int virtualId = 0; virtualId < stageParallelism; virtualId++) {
+ serverInstances.add(new VirtualServer(server, virtualId));
+ }
}
}
return serverInstances;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 32ac1247c5..e37fa15133 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -90,6 +90,11 @@ public class MailboxSendOperator extends MultiStageOperator {
for (VirtualServer serverInstance : receivingStageInstances) {
if (serverInstance.getHostname().equals(mailboxService.getHostname())
&& serverInstance.getQueryMailboxPort() ==
mailboxService.getMailboxPort()) {
+ if (singletonInstance != null &&
singletonInstance.getServer().equals(serverInstance.getServer())) {
+ throw new IllegalArgumentException("Cannot issue query with
stageParallelism > 1 for queries that "
+ + "use SINGLETON exchange. This is an internal limitation that
is being worked on - reissue "
+ + "your query again without stageParallelism.");
+ }
Preconditions.checkState(singletonInstance == null, "multiple
instance found for singleton exchange type!");
singletonInstance = serverInstance;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 918d459312..36f7e483cd 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -21,7 +21,8 @@ package org.apache.pinot.query.runtime.plan.serde;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
@@ -59,16 +60,26 @@ public class QueryPlanSerDeUtils {
.putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
}
+ private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
+
"(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)\\((?<grpc>[0-9]+):(?<service>[0-9]+):(?<mailbox>[0-9]+)\\)");
+
public static VirtualServer stringToInstance(String serverInstanceString) {
- String[] s = StringUtils.split(serverInstanceString, '_');
+ Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(serverInstanceString);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Unexpected serverInstanceString '" +
serverInstanceString + "'. This might "
+ + "happen if you are upgrading from an old version of the multistage
engine to the current one in a rolling "
+ + "fashion.");
+ }
+
// Skipped netty and grpc port as they are not used in worker instance.
- return new VirtualServer(new WorkerInstance(s[0], Integer.parseInt(s[1]),
Integer.parseInt(s[2]),
- Integer.parseInt(s[3]), Integer.parseInt(s[4])), 0);
+ return new VirtualServer(new WorkerInstance(matcher.group("host"),
Integer.parseInt(matcher.group("port")),
+ Integer.parseInt(matcher.group("grpc")),
Integer.parseInt(matcher.group("service")),
+ Integer.parseInt(matcher.group("mailbox"))),
Integer.parseInt(matcher.group("virtualid")));
}
public static String instanceToString(VirtualServer serverInstance) {
- return StringUtils.join(serverInstance.getHostname(), '_',
serverInstance.getPort(), '_',
- serverInstance.getGrpcPort(), '_',
serverInstance.getQueryServicePort(), '_',
+ return String.format("%s@%s:%s(%s:%s:%s)", serverInstance.getVirtualId(),
serverInstance.getHostname(),
+ serverInstance.getPort(), serverInstance.getGrpcPort(),
serverInstance.getQueryServicePort(),
serverInstance.getQueryMailboxPort());
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index b921c8fdd8..d58c187447 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -112,6 +112,11 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
protected List<Object[]> queryH2(String sql)
throws Exception {
+ int firstSemi = sql.indexOf(';');
+ if (firstSemi > 0 && firstSemi != sql.length() - 1) {
+ // trim off any SET statements for H2
+ sql = sql.substring(firstSemi + 1);
+ }
Statement h2statement =
_h2Connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
h2statement.execute(sql);
ResultSet h2ResultSet = h2statement.getResultSet();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
new file mode 100644
index 0000000000..62cd9b6e18
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.runtime.plan.serde;
+
+import org.apache.pinot.query.routing.VirtualServer;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class QueryPlanSerDeUtilsTest {
+
+ @Test
+ public void shouldSerializeServer() {
+ // Given:
+ VirtualServer server = Mockito.mock(VirtualServer.class);
+ Mockito.when(server.getVirtualId()).thenReturn(1);
+ Mockito.when(server.getHostname()).thenReturn("Server_192.987.1.123");
+ Mockito.when(server.getPort()).thenReturn(80);
+ Mockito.when(server.getGrpcPort()).thenReturn(10);
+ Mockito.when(server.getQueryServicePort()).thenReturn(20);
+ Mockito.when(server.getQueryMailboxPort()).thenReturn(30);
+
+ // When:
+ String serialized = QueryPlanSerDeUtils.instanceToString(server);
+
+ // Then:
+ Assert.assertEquals(serialized, "1@Server_192.987.1.123:80(10:20:30)");
+ }
+
+ @Test
+ public void shouldDeserializeServerString() {
+ // Given:
+ String serverString = "1@Server_192.987.1.123:80(10:20:30)";
+
+ // When:
+ VirtualServer server = QueryPlanSerDeUtils.stringToInstance(serverString);
+
+ // Then:
+ Assert.assertEquals(server.getVirtualId(), 1);
+ Assert.assertEquals(server.getHostname(), "Server_192.987.1.123");
+ Assert.assertEquals(server.getPort(), 80);
+ Assert.assertEquals(server.getGrpcPort(), 10);
+ Assert.assertEquals(server.getQueryServicePort(), 20);
+ Assert.assertEquals(server.getQueryMailboxPort(), 30);
+ }
+}
diff --git a/pinot-query-runtime/src/test/resources/queries/Parallelism.json
b/pinot-query-runtime/src/test/resources/queries/Parallelism.json
new file mode 100644
index 0000000000..5f5c3f6f55
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/Parallelism.json
@@ -0,0 +1,47 @@
+{
+ "test_parallelism": {
+ "tables": {
+ "l": {
+ "schema": [
+ {"name": "key", "type": "STRING"},
+ {"name": "lval", "type": "INT"}
+ ],
+ "inputs": [
+ ["foo", 1],
+ ["foo", 3],
+ ["foo", 5],
+ ["bar", 2],
+ ["bar", 4],
+ ["bar", 6]
+ ]
+ },
+ "r": {
+ "schema": [
+ {"name": "key", "type": "STRING"},
+ {"name": "rval", "type": "INT"}
+ ],
+ "inputs": [
+ ["foo", 1],
+ ["foo", 3],
+ ["foo", 7],
+ ["bar", 2],
+ ["bar", 4],
+ ["bar", 8]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SET stageParallelism=2; SELECT * FROM {l} WHERE lval > 3"},
+ {"sql": "SET stageParallelism=2; SELECT * FROM {l} WHERE lval > 3 ORDER
BY lval LIMIT 1"},
+ {"sql": "SET stageParallelism=2; SELECT key, SUM(lval) FROM {l} GROUP BY
key"},
+ {"sql": "SET stageParallelism=2; SELECT {l}.key, {l}.lval, {r}.rval FROM
{l} JOIN {r} ON {l}.key = {r}.key"},
+ {"sql": "SET stageParallelism=2; SELECT {l}.key, SUM({l}.lval +
{r}.rval) FROM {l} JOIN {r} ON {l}.key = {r}.key GROUP BY {l}.key"},
+ {"sql": "SET stageParallelism=2; SELECT * FROM {l} WHERE lval NOT IN
(SELECT rval FROM {r} WHERE rval > 2)"},
+ {
+ "description": "current stage parallelism doesn't work with broadcast
join",
+ "sql": "SET stageParallelism=2; SELECT * FROM {l}, {r}",
+ "expectedException": ".*Cannot issue query with stageParallelism > 1
for queries that use SINGLETON exchange.*"
+ }
+ ]
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0abc692fbb..2537a9f5ab 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -299,6 +299,7 @@ public class CommonConstants {
public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"maxInitialResultHolderCapacity";
public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
+ public static final String STAGE_PARALLELISM = "stageParallelism";
// TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]