This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f77dfd37 [multistage] add support for stage parallelism (#10166)
e6f77dfd37 is described below

commit e6f77dfd3747d385fd2e13103c2fd893522f5b8a
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Jan 26 12:11:13 2023 -0800

    [multistage] add support for stage parallelism (#10166)
    
    * [multistage] add support for stage parallelism
    
    * remove unnecessary change
---
 .../pinot/query/planner/logical/StagePlanner.java  |  2 +-
 .../apache/pinot/query/routing/WorkerManager.java  | 16 ++++--
 .../runtime/operator/MailboxSendOperator.java      |  5 ++
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    | 23 ++++++--
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  5 ++
 .../plan/serde/QueryPlanSerDeUtilsTest.java        | 66 ++++++++++++++++++++++
 .../src/test/resources/queries/Parallelism.json    | 47 +++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 8 files changed, 153 insertions(+), 12 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 0af5497ee2..1081459dd7 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -85,7 +85,7 @@ public class StagePlanner {
 
     // assign workers to each stage.
     for (Map.Entry<Integer, StageMetadata> e : 
queryPlan.getStageMetadataMap().entrySet()) {
-      _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId);
+      _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId, 
_plannerContext.getOptions());
     }
 
     // Run physical optimizations
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 0e3750dc5d..60930b6dce 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -59,7 +59,8 @@ public class WorkerManager {
     _routingManager = routingManager;
   }
 
-  public void assignWorkerToStage(int stageId, StageMetadata stageMetadata, 
long requestId) {
+  public void assignWorkerToStage(int stageId, StageMetadata stageMetadata, 
long requestId,
+      Map<String, String> options) {
     List<String> scannedTables = stageMetadata.getScannedTables();
     if (scannedTables.size() == 1) {
       // table scan stage, need to attach server as well as segment info for 
each physical table type.
@@ -97,7 +98,7 @@ public class WorkerManager {
       stageMetadata.setServerInstances(new ArrayList<>(
           serverInstanceToSegmentsMap.keySet()
               .stream()
-              .map(server -> new VirtualServer(server, 0)) // for now, only 
use single virtual server
+              .map(server -> new VirtualServer(server, 0)) // the leaf stage 
only has one server, so always use 0 here
               .collect(Collectors.toList())));
       
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
     } else if (PlannerUtils.isRootStage(stageId)) {
@@ -106,11 +107,14 @@ public class WorkerManager {
       stageMetadata.setServerInstances(Lists.newArrayList(
           new VirtualServer(new WorkerInstance(_hostName, _port, _port, _port, 
_port), 0)));
     } else {
-      
stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values()));
+      
stageMetadata.setServerInstances(assignServers(_routingManager.getEnabledServerInstanceMap().values(),
 options));
     }
   }
 
-  private static List<VirtualServer> filterServers(Collection<ServerInstance> 
servers) {
+  private static List<VirtualServer> assignServers(Collection<ServerInstance> 
servers, Map<String, String> options) {
+    int stageParallelism = Integer.parseInt(
+        
options.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.STAGE_PARALLELISM,
 "1"));
+
     List<VirtualServer> serverInstances = new ArrayList<>();
     for (ServerInstance server : servers) {
       String hostname = server.getHostname();
@@ -118,7 +122,9 @@ public class WorkerManager {
           && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
           && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE)
           && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
-        serverInstances.add(new VirtualServer(server, 0));
+        for (int virtualId = 0; virtualId < stageParallelism; virtualId++) {
+          serverInstances.add(new VirtualServer(server, virtualId));
+        }
       }
     }
     return serverInstances;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 32ac1247c5..e37fa15133 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -90,6 +90,11 @@ public class MailboxSendOperator extends MultiStageOperator {
       for (VirtualServer serverInstance : receivingStageInstances) {
         if (serverInstance.getHostname().equals(mailboxService.getHostname())
             && serverInstance.getQueryMailboxPort() == 
mailboxService.getMailboxPort()) {
+          if (singletonInstance != null && 
singletonInstance.getServer().equals(serverInstance.getServer())) {
+            throw new IllegalArgumentException("Cannot issue query with 
stageParallelism > 1 for queries that "
+                + "use SINGLETON exchange. This is an internal limitation that 
is being worked on - reissue "
+                + "your query again without stageParallelism.");
+          }
           Preconditions.checkState(singletonInstance == null, "multiple 
instance found for singleton exchange type!");
           singletonInstance = serverInstance;
         }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 918d459312..36f7e483cd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -21,7 +21,8 @@ package org.apache.pinot.query.runtime.plan.serde;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -59,16 +60,26 @@ public class QueryPlanSerDeUtils {
         
.putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
   }
 
+  private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
+      
"(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)\\((?<grpc>[0-9]+):(?<service>[0-9]+):(?<mailbox>[0-9]+)\\)");
+
   public static VirtualServer stringToInstance(String serverInstanceString) {
-    String[] s = StringUtils.split(serverInstanceString, '_');
+    Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(serverInstanceString);
+    if (!matcher.matches()) {
+      throw new IllegalArgumentException("Unexpected serverInstanceString '" + 
serverInstanceString + "'. This might "
+          + "happen if you are upgrading from an old version of the multistage 
engine to the current one in a rolling "
+          + "fashion.");
+    }
+
     // Skipped netty and grpc port as they are not used in worker instance.
-    return new VirtualServer(new WorkerInstance(s[0], Integer.parseInt(s[1]), 
Integer.parseInt(s[2]),
-        Integer.parseInt(s[3]), Integer.parseInt(s[4])), 0);
+    return new VirtualServer(new WorkerInstance(matcher.group("host"), 
Integer.parseInt(matcher.group("port")),
+        Integer.parseInt(matcher.group("grpc")), 
Integer.parseInt(matcher.group("service")),
+        Integer.parseInt(matcher.group("mailbox"))), 
Integer.parseInt(matcher.group("virtualid")));
   }
 
   public static String instanceToString(VirtualServer serverInstance) {
-    return StringUtils.join(serverInstance.getHostname(), '_', 
serverInstance.getPort(), '_',
-        serverInstance.getGrpcPort(), '_', 
serverInstance.getQueryServicePort(), '_',
+    return String.format("%s@%s:%s(%s:%s:%s)", serverInstance.getVirtualId(), 
serverInstance.getHostname(),
+        serverInstance.getPort(), serverInstance.getGrpcPort(), 
serverInstance.getQueryServicePort(),
         serverInstance.getQueryMailboxPort());
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index b921c8fdd8..d58c187447 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -112,6 +112,11 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
 
   protected List<Object[]> queryH2(String sql)
       throws Exception {
+    int firstSemi = sql.indexOf(';');
+    if (firstSemi > 0 && firstSemi != sql.length() - 1) {
+      // trim off any SET statements for H2
+      sql = sql.substring(firstSemi + 1);
+    }
     Statement h2statement = 
_h2Connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
     h2statement.execute(sql);
     ResultSet h2ResultSet = h2statement.getResultSet();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
new file mode 100644
index 0000000000..62cd9b6e18
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.runtime.plan.serde;
+
+import org.apache.pinot.query.routing.VirtualServer;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class QueryPlanSerDeUtilsTest {
+
+  @Test
+  public void shouldSerializeServer() {
+    // Given:
+    VirtualServer server = Mockito.mock(VirtualServer.class);
+    Mockito.when(server.getVirtualId()).thenReturn(1);
+    Mockito.when(server.getHostname()).thenReturn("Server_192.987.1.123");
+    Mockito.when(server.getPort()).thenReturn(80);
+    Mockito.when(server.getGrpcPort()).thenReturn(10);
+    Mockito.when(server.getQueryServicePort()).thenReturn(20);
+    Mockito.when(server.getQueryMailboxPort()).thenReturn(30);
+
+    // When:
+    String serialized = QueryPlanSerDeUtils.instanceToString(server);
+
+    // Then:
+    Assert.assertEquals(serialized, "1@Server_192.987.1.123:80(10:20:30)");
+  }
+
+  @Test
+  public void shouldDeserializeServerString() {
+    // Given:
+    String serverString = "1@Server_192.987.1.123:80(10:20:30)";
+
+    // When:
+    VirtualServer server = QueryPlanSerDeUtils.stringToInstance(serverString);
+
+    // Then:
+    Assert.assertEquals(server.getVirtualId(), 1);
+    Assert.assertEquals(server.getHostname(), "Server_192.987.1.123");
+    Assert.assertEquals(server.getPort(), 80);
+    Assert.assertEquals(server.getGrpcPort(), 10);
+    Assert.assertEquals(server.getQueryServicePort(), 20);
+    Assert.assertEquals(server.getQueryMailboxPort(), 30);
+  }
+}
diff --git a/pinot-query-runtime/src/test/resources/queries/Parallelism.json 
b/pinot-query-runtime/src/test/resources/queries/Parallelism.json
new file mode 100644
index 0000000000..5f5c3f6f55
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/Parallelism.json
@@ -0,0 +1,47 @@
+{
+  "test_parallelism": {
+    "tables": {
+      "l": {
+        "schema": [
+          {"name": "key", "type": "STRING"},
+          {"name": "lval", "type": "INT"}
+        ],
+        "inputs": [
+          ["foo", 1],
+          ["foo", 3],
+          ["foo", 5],
+          ["bar", 2],
+          ["bar", 4],
+          ["bar", 6]
+        ]
+      },
+      "r": {
+        "schema": [
+          {"name": "key", "type": "STRING"},
+          {"name": "rval", "type": "INT"}
+        ],
+        "inputs": [
+          ["foo", 1],
+          ["foo", 3],
+          ["foo", 7],
+          ["bar", 2],
+          ["bar", 4],
+          ["bar", 8]
+        ]
+      }
+    },
+    "queries": [
+      {"sql": "SET stageParallelism=2; SELECT * FROM {l} WHERE lval > 3"},
+      {"sql": "SET stageParallelism=2; SELECT * FROM {l} WHERE lval > 3 ORDER 
BY lval LIMIT 1"},
+      {"sql": "SET stageParallelism=2; SELECT key, SUM(lval) FROM {l} GROUP BY 
key"},
+      {"sql": "SET stageParallelism=2; SELECT {l}.key, {l}.lval, {r}.rval FROM 
{l} JOIN {r} ON {l}.key = {r}.key"},
+      {"sql": "SET stageParallelism=2; SELECT {l}.key, SUM({l}.lval + 
{r}.rval) FROM {l} JOIN {r} ON {l}.key = {r}.key GROUP BY {l}.key"},
+      {"sql": "SET stageParallelism=2; SELECT * FROM {l} WHERE lval NOT IN 
(SELECT rval FROM {r} WHERE rval > 2)"},
+      {
+        "description": "current stage parallelism doesn't work with broadcast 
join",
+        "sql": "SET stageParallelism=2; SELECT * FROM {l}, {r}",
+        "expectedException": ".*Cannot issue query with stageParallelism > 1 
for queries that use SINGLETON exchange.*"
+      }
+    ]
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0abc692fbb..2537a9f5ab 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -299,6 +299,7 @@ public class CommonConstants {
         public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
         public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"maxInitialResultHolderCapacity";
         public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
+        public static final String STAGE_PARALLELISM = "stageParallelism";
 
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to