This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ff0a3530fc [multistage] hybrid routing support (#9379)
ff0a3530fc is described below
commit ff0a3530fc0b4b92191c7598829e220994c0e68f
Author: Rong Rong <[email protected]>
AuthorDate: Sun Sep 11 15:28:41 2022 -0700
[multistage] hybrid routing support (#9379)
Preliminary support for hybrid routing on the multistage engine.
It copies some of the logic in the broker request side but not all. and
only has basic testing.
- copied some time boundary logic (manager and associated logic) into
either the core module or replicated in the multi-stage planner
- created a hybrid routing dispatchable StageMetadata
- indexed each segment with table types
- added time boundary info if dispatching multiple table types, otherwise
ignore
- reconstructed server requests multiple times.
Co-authored-by: Rong Rong <[email protected]>
---
.../broker/api/resources/PinotBrokerDebug.java | 2 +-
.../requesthandler/BaseBrokerRequestHandler.java | 2 +-
.../pinot/broker/routing/BrokerRoutingManager.java | 3 +-
.../routing/timeboundary/TimeBoundaryManager.java | 1 +
.../broker/broker/HelixBrokerStarterTest.java | 2 +-
.../timeboundary/TimeBoundaryManagerTest.java | 1 +
pinot-common/src/main/proto/worker.proto | 8 ++-
.../apache/pinot/core/routing/RoutingManager.java | 8 +++
.../pinot/core/routing}/TimeBoundaryInfo.java | 2 +-
.../apache/pinot/query/planner/StageMetadata.java | 21 +++++-
.../apache/pinot/query/routing/WorkerManager.java | 69 ++++++++++++++----
.../apache/pinot/query/QueryCompilationTest.java | 11 ++-
.../pinot/query/QueryEnvironmentTestBase.java | 1 +
.../pinot/query/QueryEnvironmentTestUtils.java | 17 +++--
.../apache/pinot/query/runtime/QueryRunner.java | 84 ++++++++++++++--------
.../runtime/plan/serde/QueryPlanSerDeUtils.java | 37 ++++++++--
.../query/runtime/utils/ServerRequestUtils.java | 82 +++++++++++++++++----
.../apache/pinot/query/QueryServerEnclosure.java | 4 +-
.../query/runtime/QueryRunnerExceptionTest.java | 4 +-
.../pinot/query/runtime/QueryRunnerTest.java | 3 +
.../pinot/query/runtime/QueryRunnerTestBase.java | 13 ++--
21 files changed, 287 insertions(+), 88 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 67d69a1ba2..6252ae5630 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -41,8 +41,8 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 07d9b321f7..27f7f32202 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -50,7 +50,6 @@ import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.http.MultiHttpRequest;
@@ -74,6 +73,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.core.util.QueryOptionsUtils;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 2f5cc95173..04ddf7f37a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -47,7 +47,6 @@ import
org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
import org.apache.pinot.broker.routing.segmentpruner.SegmentPrunerFactory;
import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
import org.apache.pinot.broker.routing.segmentselector.SegmentSelectorFactory;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryManager;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -56,6 +55,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -596,6 +596,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
* <p>NOTE: Time boundary info is only available for the offline part of the
hybrid table.
*/
@Nullable
+ @Override
public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) {
RoutingEntry routingEntry = _routingEntryMap.get(offlineTableName);
if (routingEntry == null) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index 17100de6df..3aeed3e35a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -33,6 +33,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index ddab2f7bf9..0fc2ddcd2e 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -25,7 +25,6 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -33,6 +32,7 @@ import
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index fb0d664874..34271815f4 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
diff --git a/pinot-common/src/main/proto/worker.proto
b/pinot-common/src/main/proto/worker.proto
index 87aecc8391..8f780bd260 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -68,7 +68,13 @@ message StagePlan {
message StageMetadata {
repeated string instances = 1;
repeated string dataSources = 2;
- map<string, SegmentList> instanceToSegmentList = 3;
+ map<string, SegmentMap> instanceToSegmentMap = 3;
+ string timeColumn = 4;
+ string timeValue = 5;
+}
+
+message SegmentMap {
+ map<string, SegmentList> tableTypeToSegmentList = 1;
}
message SegmentList {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index fbb3f5bf00..db535dcaa6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -59,4 +59,12 @@ public interface RoutingManager {
* @return true if the route table exists.
*/
boolean routingExists(String tableNameWithType);
+
+ /**
+ * Acquire the time boundary info. Useful for hybrid logical table queries
that needs to split between
+ * realtime and offline.
+ * @param offlineTableName offline table name
+ * @return time boundary info.
+ */
+ TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryInfo.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java
similarity index 95%
rename from
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryInfo.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java
index 5efbafadea..f6d804f5f0 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryInfo.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.broker.routing.timeboundary;
+package org.apache.pinot.core.routing;
public class TimeBoundaryInfo {
private final String _timeColumn;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index 8e691003c9..fe2531f9b6 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
@@ -45,12 +46,17 @@ public class StageMetadata implements Serializable {
private List<ServerInstance> _serverInstances;
// used for table scan stage.
- private Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+ private Map<ServerInstance, Map<String, List<String>>>
_serverInstanceToSegmentsMap;
+
+ // time boundary info
+ private TimeBoundaryInfo _timeBoundaryInfo;
+
public StageMetadata() {
_scannedTables = new ArrayList<>();
_serverInstances = new ArrayList<>();
_serverInstanceToSegmentsMap = new HashMap<>();
+ _timeBoundaryInfo = null;
}
public void attach(StageNode stageNode) {
@@ -67,11 +73,12 @@ public class StageMetadata implements Serializable {
// attached physical plan context.
// -----------------------------------------------
- public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+ public Map<ServerInstance, Map<String, List<String>>>
getServerInstanceToSegmentsMap() {
return _serverInstanceToSegmentsMap;
}
- public void setServerInstanceToSegmentsMap(Map<ServerInstance, List<String>>
serverInstanceToSegmentsMap) {
+ public void setServerInstanceToSegmentsMap(
+ Map<ServerInstance, Map<String, List<String>>>
serverInstanceToSegmentsMap) {
_serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
}
@@ -82,4 +89,12 @@ public class StageMetadata implements Serializable {
public void setServerInstances(List<ServerInstance> serverInstances) {
_serverInstances = serverInstances;
}
+
+ public TimeBoundaryInfo getTimeBoundaryInfo() {
+ return _timeBoundaryInfo;
+ }
+
+ public void setTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
+ _timeBoundaryInfo = timeBoundaryInfo;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 03f4ddcd9c..9238c33ca4 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.routing;
+import com.clearspring.analytics.util.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
@@ -26,6 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.StageMetadata;
@@ -58,11 +60,38 @@ public class WorkerManager {
public void assignWorkerToStage(int stageId, StageMetadata stageMetadata) {
List<String> scannedTables = stageMetadata.getScannedTables();
- if (scannedTables.size() == 1) { // table scan stage, need to attach
server as well as segment info.
- RoutingTable routingTable = getRoutingTable(scannedTables.get(0));
- Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
+ if (scannedTables.size() == 1) {
+ // table scan stage, need to attach server as well as segment info for
each physical table type.
+ String logicalTableName = scannedTables.get(0);
+ Map<String, RoutingTable> routingTableMap =
getRoutingTable(logicalTableName);
+ // acquire time boundary info if it is a hybrid table.
+ if (routingTableMap.size() > 1) {
+ TimeBoundaryInfo timeBoundaryInfo =
_routingManager.getTimeBoundaryInfo(TableNameBuilder
+
.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(logicalTableName)));
+ if (timeBoundaryInfo != null) {
+ stageMetadata.setTimeBoundaryInfo(timeBoundaryInfo);
+ } else {
+ // remove offline table routing if no time boundary info is acquired.
+ routingTableMap.remove(TableType.OFFLINE.name());
+ }
+ }
+
+ // extract all the instances associated to each table type
+ Map<ServerInstance, Map<String, List<String>>>
serverInstanceToSegmentsMap = new HashMap<>();
+ for (Map.Entry<String, RoutingTable> routingEntry :
routingTableMap.entrySet()) {
+ String tableType = routingEntry.getKey();
+ RoutingTable routingTable = routingEntry.getValue();
+ // for each server instance, attach all table types and their
associated segment list.
+ for (Map.Entry<ServerInstance, List<String>> serverEntry
+ : routingTable.getServerInstanceToSegmentsMap().entrySet()) {
+ serverInstanceToSegmentsMap.putIfAbsent(serverEntry.getKey(), new
HashMap<>());
+ Map<String, List<String>> tableTypeToSegmentListMap =
serverInstanceToSegmentsMap.get(serverEntry.getKey());
+ Preconditions.checkState(tableTypeToSegmentListMap.put(tableType,
serverEntry.getValue()) == null,
+ "Entry for server {} and table type: {} already exist!",
serverEntry.getKey(), tableType);
+ }
+ }
stageMetadata.setServerInstances(new
ArrayList<>(serverInstanceToSegmentsMap.keySet()));
- stageMetadata.setServerInstanceToSegmentsMap(new
HashMap<>(serverInstanceToSegmentsMap));
+
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
} else if (PlannerUtils.isRootStage(stageId)) {
// ROOT stage doesn't have a QueryServer as it is strictly only reducing
results.
// here we simply assign the worker instance with identical
server/mailbox port number.
@@ -86,13 +115,29 @@ public class WorkerManager {
return serverInstances;
}
- private RoutingTable getRoutingTable(String tableName) {
- String rawTableName = TableNameBuilder.extractRawTableName(tableName);
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- // TODO: support both offline and realtime, now default only query the
OFFLINE table.
- tableType = tableType == null ? TableType.OFFLINE : tableType;
- String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
- return
_routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(
- "SELECT * FROM " + tableNameWithType));
+ /**
+ * Acquire routing table for items listed in {@link
org.apache.pinot.query.planner.stage.TableScanNode}.
+ *
+ * @param logicalTableName it can either be a hybrid table name or a
physical table name with table type.
+ * @return keyed-map from table type(s) to routing table(s).
+ */
+ private Map<String, RoutingTable> getRoutingTable(String logicalTableName) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(logicalTableName);
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(logicalTableName);
+ Map<String, RoutingTable> routingTableMap = new HashMap<>();
+ if (tableType == null) {
+ routingTableMap.put(TableType.OFFLINE.name(),
getRoutingTable(rawTableName, TableType.OFFLINE));
+ routingTableMap.put(TableType.REALTIME.name(),
getRoutingTable(rawTableName, TableType.REALTIME));
+ } else {
+ routingTableMap.put(tableType.name(), getRoutingTable(logicalTableName,
tableType));
+ }
+ return routingTableMap;
+ }
+
+ private RoutingTable getRoutingTable(String tableName, TableType tableType) {
+ String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(
+ TableNameBuilder.extractRawTableName(tableName));
+ return _routingManager.getRoutingTable(
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
tableNameWithType));
}
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index dcd7c5c11a..94c62c856e 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -122,7 +122,7 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
// table scan stages; for tableA it should have 2 hosts, for tableB it
should have only 1
Assert.assertEquals(
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
- tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_1",
"Server_localhost_2")
+ tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_2",
"Server_localhost_1")
: ImmutableList.of("Server_localhost_1"));
} else if (!PlannerUtils.isRootStage(e.getKey())) {
// join stage should have both servers used.
@@ -159,8 +159,8 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
List<StageMetadata> tableScanMetadataList =
queryPlan.getStageMetadataMap().values().stream()
.filter(stageMetadata -> stageMetadata.getScannedTables().size() !=
0).collect(Collectors.toList());
Assert.assertEquals(tableScanMetadataList.size(), 1);
-
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(),
1);
-
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
"Server_localhost_1");
+
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(),
2);
+
query = "SELECT * FROM d_REALTIME";
queryPlan = _queryEnvironment.planQuery(query);
tableScanMetadataList = queryPlan.getStageMetadataMap().values().stream()
@@ -169,15 +169,12 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(),
1);
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
"Server_localhost_2");
- // Default routing to OFFLINE table.
- // TODO: change this test to assert actual time-boundary routing once we
support this.
query = "SELECT * FROM d";
queryPlan = _queryEnvironment.planQuery(query);
tableScanMetadataList = queryPlan.getStageMetadataMap().values().stream()
.filter(stageMetadata -> stageMetadata.getScannedTables().size() !=
0).collect(Collectors.toList());
Assert.assertEquals(tableScanMetadataList.size(), 1);
-
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(),
1);
-
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
"Server_localhost_1");
+
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(),
2);
}
// Test that plan query can be run as multi-thread.
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index f83eea0fb3..65ea646e9c 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -45,6 +45,7 @@ public class QueryEnvironmentTestBase {
return new Object[][] {
new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 10"},
new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 10"},
+ new Object[]{"SELECT * FROM d"},
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2"},
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3
>= 0"},
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3
>= 0 AND a.col3 > b.col3"},
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
index e882d09db7..e8c29aeb02 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
@@ -24,12 +24,14 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.planner.QueryPlan;
@@ -42,6 +44,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -54,10 +57,10 @@ public class QueryEnvironmentTestUtils {
public static final Schema.SchemaBuilder SCHEMA_BUILDER;
public static final Map<String, List<String>> SERVER1_SEGMENTS =
ImmutableMap.of("a", Lists.newArrayList("a1", "a2"), "b",
Lists.newArrayList("b1"), "c",
- Lists.newArrayList("c1"), "d_O", Lists.newArrayList("d1", "d2"));
+ Lists.newArrayList("c1"), "d_O", Lists.newArrayList("d1"));
public static final Map<String, List<String>> SERVER2_SEGMENTS =
ImmutableMap.of("a", Lists.newArrayList("a3"), "c",
Lists.newArrayList("c2", "c3"),
- "d_R", Lists.newArrayList("d3", "d4"));
+ "d_R", Lists.newArrayList("d2"), "d_O", Lists.newArrayList("d3"));
static {
SCHEMA_BUILDER = new
Schema.SchemaBuilder().addSingleValueDimension("col1",
FieldSpec.DataType.STRING, "")
@@ -72,7 +75,7 @@ public class QueryEnvironmentTestUtils {
public static TableCache mockTableCache() {
TableCache mock = mock(TableCache.class);
- when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a", "a", "b",
"b", "c", "c",
+ when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a_REALTIME", "a",
"b_REALTIME", "b", "c_REALTIME", "c",
"d_OFFLINE", "d", "d_REALTIME", "d"));
when(mock.getSchema("a")).thenReturn(SCHEMA_BUILDER.setSchemaName("a").build());
when(mock.getSchema("b")).thenReturn(SCHEMA_BUILDER.setSchemaName("b").build());
@@ -108,7 +111,8 @@ public class QueryEnvironmentTestUtils {
// hybrid table
RoutingTable rtDOffline = mock(RoutingTable.class);
RoutingTable rtDRealtime = mock(RoutingTable.class);
-
when(rtDOffline.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host1,
SERVER1_SEGMENTS.get("d_O")));
+ when(rtDOffline.getServerInstanceToSegmentsMap()).thenReturn(
+ ImmutableMap.of(host1, SERVER1_SEGMENTS.get("d_O"), host2,
SERVER2_SEGMENTS.get("d_O")));
when(rtDRealtime.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host2,
SERVER2_SEGMENTS.get("d_R")));
Map<String, RoutingTable> mockRoutingTableMap = ImmutableMap.of("a", rtA,
"b", rtB, "c", rtC,
"d_OFFLINE", rtDOffline, "d_REALTIME", rtDRealtime);
@@ -121,6 +125,11 @@ public class QueryEnvironmentTestUtils {
mockRoutingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
});
when(mock.getEnabledServerInstanceMap()).thenReturn(ImmutableMap.of(server1,
host1, server2, host2));
+ when(mock.getTimeBoundaryInfo(anyString())).thenAnswer(invocation -> {
+ String offlineTableName = invocation.getArgument(0);
+ return "d_OFFLINE".equals(offlineTableName) ? new TimeBoundaryInfo("ts",
+ String.valueOf(System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(1))) : null;
+ });
return mock;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index bf53684d32..9419195a50 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -41,6 +42,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -48,12 +50,15 @@ import
org.apache.pinot.query.runtime.utils.ServerRequestUtils;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@link QueryRunner} accepts a {@link DistributedStagePlan} and runs it.
*/
public class QueryRunner {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryRunner.class);
// This is a temporary before merging the 2 type of executor.
private ServerQueryExecutorV1Impl _serverExecutor;
private WorkerQueryExecutor _workerExecutor;
@@ -99,42 +104,51 @@ public class QueryRunner {
// TODO: make server query request return via mailbox, this is a hack to
gather the non-streaming data table
// and package it here for return. But we should really use a
MailboxSendOperator directly put into the
// server executor.
- ServerQueryRequest serverQueryRequest =
+ List<ServerQueryRequest> serverQueryRequests =
ServerRequestUtils.constructServerQueryRequest(distributedStagePlan,
requestMetadataMap);
// send the data table via mailbox in one-off fashion (e.g. no
block-level split, one data table/partition key)
- BaseDataBlock dataBlock;
- try {
- DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest,
executorService, null);
- if (!dataTable.getExceptions().isEmpty()) {
- // if contains exception, directly return a metadata block with the
exceptions.
- dataBlock =
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
- } else {
- // this works because default DataTableImplV3 will have a version
number at beginning:
- // the new DataBlock encodes lower 16 bits as version and upper 16
bits as type (ROW, COLUMNAR, METADATA)
- dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
- }
- } catch (Exception e) {
- dataBlock = DataBlockUtils.getErrorDataBlock(e);
+ List<BaseDataBlock> serverQueryResults = new
ArrayList<>(serverQueryRequests.size());
+ for (ServerQueryRequest request : serverQueryRequests) {
+ serverQueryResults.add(processServerQuery(request, executorService));
}
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
StageMetadata receivingStageMetadata =
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
MailboxSendOperator mailboxSendOperator =
new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(),
- new LeafStageTransferableBlockOperator(dataBlock,
sendNode.getDataSchema()),
+ new LeafStageTransferableBlockOperator(serverQueryResults,
sendNode.getDataSchema()),
receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(),
- sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequest.getRequestId(),
+ sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequests.get(0).getRequestId(),
sendNode.getStageId());
- mailboxSendOperator.nextBlock();
- if (dataBlock.getExceptions().isEmpty()) {
- mailboxSendOperator.nextBlock();
+ int blockCounter = 0;
+ while
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
+ LOGGER.debug("Acquired transferable block: {}", blockCounter++);
}
} else {
_workerExecutor.processQuery(distributedStagePlan, requestMetadataMap,
executorService);
}
}
+ private BaseDataBlock processServerQuery(ServerQueryRequest
serverQueryRequest,
+ ExecutorService executorService) {
+ BaseDataBlock dataBlock;
+ try {
+ DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest,
executorService, null);
+ if (!dataTable.getExceptions().isEmpty()) {
+ // if contains exception, directly return a metadata block with the
exceptions.
+ dataBlock =
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+ } else {
+ // this works because default DataTableImplV3 will have a version
number at beginning:
+ // the new DataBlock encodes lower 16 bits as version and upper 16
bits as type (ROW, COLUMNAR, METADATA)
+ dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ }
+ } catch (Exception e) {
+ dataBlock = DataBlockUtils.getErrorDataBlock(e);
+ }
+ return dataBlock;
+ }
+
/**
* Leaf-stage transfer block opreator is used to wrap around the leaf stage
process results. They are passed to the
* Pinot server to execute query thus only one {@link DataTable} were
returned. However, to conform with the
@@ -149,17 +163,17 @@ public class QueryRunner {
private static class LeafStageTransferableBlockOperator extends
BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
- private final MetadataBlock _endOfStreamBlock;
- private final BaseDataBlock _baseDataBlock;
+ private final BaseDataBlock _errorBlock;
+ private final List<BaseDataBlock> _baseDataBlocks;
private final DataSchema _dataSchema;
private boolean _hasTransferred;
+ private int _currentIndex;
- private LeafStageTransferableBlockOperator(BaseDataBlock baseDataBlock,
DataSchema dataSchema) {
- _baseDataBlock = baseDataBlock;
+ private LeafStageTransferableBlockOperator(List<BaseDataBlock>
baseDataBlocks, DataSchema dataSchema) {
+ _baseDataBlocks = baseDataBlocks;
_dataSchema = dataSchema;
- _endOfStreamBlock = baseDataBlock.getExceptions().isEmpty()
- ? DataBlockUtils.getEndOfStreamDataBlock(dataSchema) : null;
- _hasTransferred = false;
+ _errorBlock = baseDataBlocks.stream().filter(e ->
!e.getExceptions().isEmpty()).findFirst().orElse(null);
+ _currentIndex = 0;
}
@Override
@@ -175,11 +189,19 @@ public class QueryRunner {
@Override
protected TransferableBlock getNextBlock() {
- if (!_hasTransferred) {
- _hasTransferred = true;
- return new TransferableBlock(_baseDataBlock);
+ if (_currentIndex < 0) {
+ throw new RuntimeException("Leaf transfer terminated. next block
should no longer be called.");
+ }
+ if (_errorBlock != null) {
+ _currentIndex = -1;
+ return new TransferableBlock(_errorBlock);
} else {
- return new TransferableBlock(_endOfStreamBlock);
+ if (_currentIndex < _baseDataBlocks.size()) {
+ return new TransferableBlock(_baseDataBlocks.get(_currentIndex++));
+ } else {
+ _currentIndex = -1;
+ return new
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_dataSchema));
+ }
}
}
}
@@ -188,7 +210,7 @@ public class QueryRunner {
int stageId = distributedStagePlan.getStageId();
ServerInstance serverInstance = distributedStagePlan.getServerInstance();
StageMetadata stageMetadata =
distributedStagePlan.getMetadataMap().get(stageId);
- List<String> segments =
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance);
+ Map<String, List<String>> segments =
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance);
return segments != null && segments.size() > 0;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 49c2d62dcb..776e9f4e22 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.AbstractStageNode;
@@ -80,12 +81,25 @@ public class QueryPlanSerDeUtils {
private static StageMetadata fromWorkerStageMetadata(Worker.StageMetadata
workerStageMetadata) {
StageMetadata stageMetadata = new StageMetadata();
+ // scanned table
stageMetadata.getScannedTables().addAll(workerStageMetadata.getDataSourcesList());
+ // server instance to table-segments mapping
for (String serverInstanceString : workerStageMetadata.getInstancesList())
{
stageMetadata.getServerInstances().add(stringToInstance(serverInstanceString));
}
- for (Map.Entry<String, Worker.SegmentList> e :
workerStageMetadata.getInstanceToSegmentListMap().entrySet()) {
-
stageMetadata.getServerInstanceToSegmentsMap().put(stringToInstance(e.getKey()),
e.getValue().getSegmentsList());
+ for (Map.Entry<String, Worker.SegmentMap> instanceEntry
+ : workerStageMetadata.getInstanceToSegmentMapMap().entrySet()) {
+ Map<String, List<String>> tableToSegmentMap = new HashMap<>();
+ for (Map.Entry<String, Worker.SegmentList> tableEntry
+ :
instanceEntry.getValue().getTableTypeToSegmentListMap().entrySet()) {
+ tableToSegmentMap.put(tableEntry.getKey(),
tableEntry.getValue().getSegmentsList());
+ }
+
stageMetadata.getServerInstanceToSegmentsMap().put(stringToInstance(instanceEntry.getKey()),
tableToSegmentMap);
+ }
+ // time boundary info
+ if (!workerStageMetadata.getTimeColumn().isEmpty()) {
+ stageMetadata.setTimeBoundaryInfo(new
TimeBoundaryInfo(workerStageMetadata.getTimeColumn(),
+ workerStageMetadata.getTimeValue()));
}
return stageMetadata;
}
@@ -100,13 +114,26 @@ public class QueryPlanSerDeUtils {
private static Worker.StageMetadata toWorkerStageMetadata(StageMetadata
stageMetadata) {
Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
+ // scanned table
builder.addAllDataSources(stageMetadata.getScannedTables());
+ // server instance to table-segments mapping
for (ServerInstance serverInstance : stageMetadata.getServerInstances()) {
builder.addInstances(instanceToString(serverInstance));
}
- for (Map.Entry<ServerInstance, List<String>> e :
stageMetadata.getServerInstanceToSegmentsMap().entrySet()) {
- builder.putInstanceToSegmentList(instanceToString(e.getKey()),
-
Worker.SegmentList.newBuilder().addAllSegments(e.getValue()).build());
+ for (Map.Entry<ServerInstance, Map<String, List<String>>> instanceEntry
+ : stageMetadata.getServerInstanceToSegmentsMap().entrySet()) {
+ Map<String, Worker.SegmentList> tableToSegmentMap = new HashMap<>();
+ for (Map.Entry<String, List<String>> tableEntry :
instanceEntry.getValue().entrySet()) {
+ tableToSegmentMap.put(tableEntry.getKey(),
+
Worker.SegmentList.newBuilder().addAllSegments(tableEntry.getValue()).build());
+ }
+ builder.putInstanceToSegmentMap(instanceToString(instanceEntry.getKey()),
+
Worker.SegmentMap.newBuilder().putAllTableTypeToSegmentList(tableToSegmentMap).build());
+ }
+ // time boundary info
+ if (stageMetadata.getTimeBoundaryInfo() != null) {
+
builder.setTimeColumn(stageMetadata.getTimeBoundaryInfo().getTimeColumn());
+ builder.setTimeValue(stageMetadata.getTimeBoundaryInfo().getTimeValue());
}
return builder.build();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 07e02bdb8b..b14a35e1a5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -18,17 +18,23 @@
*/
package org.apache.pinot.query.runtime.utils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.QuerySource;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
@@ -37,7 +43,10 @@ import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
/**
@@ -54,24 +63,45 @@ public class ServerRequestUtils {
}
// TODO: This is a hack, make an actual ServerQueryRequest converter.
- public static ServerQueryRequest
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
+ public static List<ServerQueryRequest>
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap) {
+ StageMetadata stageMetadata =
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
+ Map<String, List<String>> tableToSegmentListMap =
stageMetadata.getServerInstanceToSegmentsMap()
+ .get(distributedStagePlan.getServerInstance());
+ List<ServerQueryRequest> requests = new ArrayList<>();
+ for (Map.Entry<String, List<String>> tableEntry :
tableToSegmentListMap.entrySet()) {
+ String tableType = tableEntry.getKey();
+ if (TableType.OFFLINE.name().equals(tableType)) {
+ requests.add(constructServerQueryRequest(distributedStagePlan,
requestMetadataMap,
+ stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE,
tableEntry.getValue()));
+ } else if (TableType.REALTIME.name().equals(tableType)) {
+ requests.add(constructServerQueryRequest(distributedStagePlan,
requestMetadataMap,
+ stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME,
tableEntry.getValue()));
+ } else {
+ throw new IllegalArgumentException("Unsupported table type key: " +
tableType);
+ }
+ }
+ return requests;
+ }
+
+ public static ServerQueryRequest
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
+ Map<String, String> requestMetadataMap, TimeBoundaryInfo
timeBoundaryInfo, TableType tableType,
+ List<String> segmentList) {
InstanceRequest instanceRequest = new InstanceRequest();
instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID")));
instanceRequest.setBrokerId("unknown");
instanceRequest.setEnableTrace(false);
- instanceRequest.setSearchSegments(
-
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId()).getServerInstanceToSegmentsMap()
- .get(distributedStagePlan.getServerInstance()));
- instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan));
+ instanceRequest.setSearchSegments(segmentList);
+ instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan,
tableType, timeBoundaryInfo));
return new ServerQueryRequest(instanceRequest, new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
System.currentTimeMillis());
}
// TODO: this is a hack, create a broker request object should not be needed
because we rewrite the entire
// query into stages already.
- public static BrokerRequest constructBrokerRequest(DistributedStagePlan
distributedStagePlan) {
- PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan);
+ public static BrokerRequest constructBrokerRequest(DistributedStagePlan
distributedStagePlan, TableType tableType,
+ TimeBoundaryInfo timeBoundaryInfo) {
+ PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan,
tableType, timeBoundaryInfo);
BrokerRequest brokerRequest = new BrokerRequest();
brokerRequest.setPinotQuery(pinotQuery);
// Set table name in broker request because it is used for access control,
query routing etc.
@@ -84,23 +114,29 @@ public class ServerRequestUtils {
return brokerRequest;
}
- public static PinotQuery constructPinotQuery(DistributedStagePlan
distributedStagePlan) {
+ public static PinotQuery constructPinotQuery(DistributedStagePlan
distributedStagePlan, TableType tableType,
+ TimeBoundaryInfo timeBoundaryInfo) {
PinotQuery pinotQuery = new PinotQuery();
pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
pinotQuery.setExplain(false);
- walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery);
+ walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery, tableType);
+ if (timeBoundaryInfo != null) {
+ attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType ==
TableType.OFFLINE);
+ }
return pinotQuery;
}
- private static void walkStageTree(StageNode node, PinotQuery pinotQuery) {
+ private static void walkStageTree(StageNode node, PinotQuery pinotQuery,
TableType tableType) {
// this walkStageTree should only be a sequential walk.
for (StageNode child : node.getInputs()) {
- walkStageTree(child, pinotQuery);
+ walkStageTree(child, pinotQuery, tableType);
}
if (node instanceof TableScanNode) {
TableScanNode tableScanNode = (TableScanNode) node;
DataSource dataSource = new DataSource();
- dataSource.setTableName(tableScanNode.getTableName());
+ String tableNameWithType = TableNameBuilder.forType(tableType)
+
.tableNameWithType(TableNameBuilder.extractRawTableName(tableScanNode.getTableName()));
+ dataSource.setTableName(tableNameWithType);
pinotQuery.setDataSource(dataSource);
pinotQuery.setSelectList(tableScanNode.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression)
.collect(Collectors.toList()));
@@ -135,4 +171,26 @@ public class ServerRequestUtils {
throw new UnsupportedOperationException("Unsupported logical plan node:
" + node);
}
}
+
+ /**
+ * Helper method to attach the time boundary to the given PinotQuery.
+ */
+ private static void attachTimeBoundary(PinotQuery pinotQuery,
TimeBoundaryInfo timeBoundaryInfo,
+ boolean isOfflineRequest) {
+ String timeColumn = timeBoundaryInfo.getTimeColumn();
+ String timeValue = timeBoundaryInfo.getTimeValue();
+ Expression timeFilterExpression = RequestUtils.getFunctionExpression(
+ isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() :
FilterKind.GREATER_THAN.name());
+ timeFilterExpression.getFunctionCall().setOperands(
+ Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn),
RequestUtils.getLiteralExpression(timeValue)));
+
+ Expression filterExpression = pinotQuery.getFilterExpression();
+ if (filterExpression != null) {
+ Expression andFilterExpression =
RequestUtils.getFunctionExpression(FilterKind.AND.name());
+
andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression,
timeFilterExpression));
+ pinotQuery.setFilterExpression(andFilterExpression);
+ } else {
+ pinotQuery.setFilterExpression(timeFilterExpression);
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 22f4eed5ca..fa564e4bea 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -144,7 +145,8 @@ public class QueryServerEnclosure {
row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]);
row.putValue("col2", STRING_FIELD_LIST[i % (STRING_FIELD_LIST.length -
2)]);
row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]);
- row.putValue("ts", System.currentTimeMillis());
+ row.putValue("ts", tableName.endsWith("_O")
+ ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2) :
System.currentTimeMillis());
rows.add(row);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
index 3b5a94d560..de70190b7f 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
public class QueryRunnerExceptionTest extends QueryRunnerTestBase {
@Test(dataProvider = "testDataWithSqlExecutionExceptions")
- public void testSqlWithFinalRowCountChecker(String sql, String exeptionMsg) {
+ public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
Map<String, String> requestMetadataMap =
ImmutableMap.of("REQUEST_ID",
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
@@ -61,7 +61,7 @@ public class QueryRunnerExceptionTest extends
QueryRunnerTestBase {
QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator);
} catch (RuntimeException rte) {
Assert.assertTrue(rte.getMessage().contains("Received error query
execution result block"));
- Assert.assertTrue(rte.getMessage().contains(exeptionMsg));
+ Assert.assertTrue(rte.getMessage().contains(exceptionMsg));
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 48823ea18d..670710590d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -72,6 +72,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
// No match filter
new Object[]{"SELECT * FROM b WHERE col3 < 0", 0},
+ // Hybrid table
+ new Object[]{"SELECT * FROM d", 15},
+
// Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
// thus the final JOIN result will be 15 x 1 = 15.
// Next join with table C which has (5 on server1 and 10 on server2),
since data is identical. each of the row
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 22cf24e60f..9842501f00 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -46,9 +46,11 @@ import static
org.apache.pinot.core.query.selection.SelectionOperatorUtils.extra
public class QueryRunnerTestBase {
private static final File INDEX_DIR_S1_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
private static final File INDEX_DIR_S1_B = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
- private static final File INDEX_DIR_S2_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
private static final File INDEX_DIR_S1_C = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
+ private static final File INDEX_DIR_S1_D = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableD");
+ private static final File INDEX_DIR_S2_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
private static final File INDEX_DIR_S2_C = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableC");
+ private static final File INDEX_DIR_S2_D = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableD");
protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
@@ -73,11 +75,12 @@ public class QueryRunnerTestBase {
public void setUp()
throws Exception {
DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
- QueryServerEnclosure server1 = new
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
- ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c",
INDEX_DIR_S1_C),
+ QueryServerEnclosure server1 = new
QueryServerEnclosure(Lists.newArrayList("a", "b", "c", "d_O"),
+ ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c",
INDEX_DIR_S1_C, "d_O", INDEX_DIR_S1_D),
QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
- QueryServerEnclosure server2 = new
QueryServerEnclosure(Lists.newArrayList("a", "c"),
- ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C),
QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
+ QueryServerEnclosure server2 = new
QueryServerEnclosure(Lists.newArrayList("a", "c", "d_R", "d_O"),
+ ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C, "d_R",
INDEX_DIR_S2_D, "d_O", INDEX_DIR_S1_D),
+ QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
_reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
_reducerHostname = String.format("Broker_%s",
QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]