This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5d312e5a3f [Multi-stage] Support partition based leaf stage processing
(#11234)
5d312e5a3f is described below
commit 5d312e5a3f9e7c6edf152e60506c3c7a222c75d2
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Aug 2 10:03:49 2023 -0700
[Multi-stage] Support partition based leaf stage processing (#11234)
---
.../apache/calcite/rel/hint/PinotHintOptions.java | 1 -
.../rel/rules/PinotJoinExchangeNodeInsertRule.java | 11 +-
.../rel/rules/PinotJoinToDynamicBroadcastRule.java | 15 +-
.../planner/logical/RelToPlanNodeConverter.java | 11 +-
.../planner/physical/DispatchablePlanMetadata.java | 28 ++-
.../planner/physical/DispatchablePlanVisitor.java | 2 +
.../planner/physical/MailboxAssignmentVisitor.java | 32 ++-
.../pinot/query/planner/plannode/JoinNode.java | 9 +-
.../apache/pinot/query/routing/WorkerManager.java | 275 ++++++++++-----------
.../pinot/query/QueryEnvironmentTestBase.java | 44 +++-
.../query/testutils/MockRoutingManagerFactory.java | 96 +++----
.../test/resources/queries/PinotHintablePlans.json | 53 +---
.../pinot/query/runtime/QueryRunnerTest.java | 47 ++--
.../runtime/operator/HashJoinOperatorTest.java | 32 +--
.../plan/pipeline/PipelineBreakerExecutorTest.java | 6 +-
.../runtime/queries/ResourceBasedQueriesTest.java | 80 +++---
.../service/dispatch/QueryDispatcherTest.java | 2 +-
.../query/service/server/QueryServerTest.java | 2 +-
.../testutils/MockInstanceDataManagerFactory.java | 70 ++++--
.../src/test/resources/queries/Aggregates.json | 4 +-
.../src/test/resources/queries/CountDistinct.json | 12 +-
.../src/test/resources/queries/QueryHints.json | 34 ++-
.../src/test/resources/queries/SelectHaving.json | 4 +-
23 files changed, 454 insertions(+), 416 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index 1c613074cf..9ffeaf8f8c 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -65,7 +65,6 @@ public class PinotHintOptions {
public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
- public static final String IS_COLOCATED_BY_JOIN_KEYS =
"is_colocated_by_join_keys";
}
public static class TableHintOptions {
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 465aa8b12d..a4b64b9800 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -25,8 +25,6 @@ import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.hint.PinotHintOptions;
-import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.PinotLogicalExchange;
import org.apache.calcite.tools.RelBuilderFactory;
@@ -65,14 +63,7 @@ public class PinotJoinExchangeNodeInsertRule extends
RelOptRule {
RelNode rightExchange;
JoinInfo joinInfo = join.analyzeCondition();
- boolean isColocatedJoin =
- PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS,
- PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
- if (isColocatedJoin) {
- // join exchange are colocated, we should directly pass through via join
key
- leftExchange = PinotLogicalExchange.create(leftInput,
RelDistributions.SINGLETON);
- rightExchange = PinotLogicalExchange.create(rightInput,
RelDistributions.SINGLETON);
- } else if (joinInfo.leftKeys.isEmpty()) {
+ if (joinInfo.leftKeys.isEmpty()) {
// when there's no JOIN key, use broadcast.
leftExchange = PinotLogicalExchange.create(leftInput,
RelDistributions.RANDOM_DISTRIBUTED);
rightExchange = PinotLogicalExchange.create(rightInput,
RelDistributions.BROADCAST_DISTRIBUTED);
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index b7b2f7f1a6..0c17a32d05 100644
---
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -155,23 +155,16 @@ public class PinotJoinToDynamicBroadcastRule extends
RelOptRule {
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight()
instanceof HepRelVertex
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
- boolean isColocatedJoin =
PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
- PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
- PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
- ? PinotLogicalExchange.create(right.getInput(),
RelDistributions.SINGLETON,
- PinotRelExchangeType.PIPELINE_BREAKER)
- : PinotLogicalExchange.create(right.getInput(),
RelDistributions.BROADCAST_DISTRIBUTED,
+ PinotLogicalExchange dynamicBroadcastExchange =
+ PinotLogicalExchange.create(right.getInput(),
RelDistributions.BROADCAST_DISTRIBUTED,
PinotRelExchangeType.PIPELINE_BREAKER);
Join dynamicFilterJoin =
new LogicalJoin(join.getCluster(), join.getTraitSet(),
left.getInput(), dynamicBroadcastExchange,
join.getCondition(), join.getVariablesSet(), join.getJoinType(),
join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList()));
// adding pass-through exchange after join b/c currently leaf-stage
doesn't support chaining operator(s) after JOIN
- // TODO: support pass-through for singleton again when non-colocated.
- // TODO: this is b/c #10886 alters the singleton exchange and it no longer
works if join is not colocated.
- PinotLogicalExchange passThroughAfterJoinExchange = isColocatedJoin
- ? PinotLogicalExchange.create(dynamicFilterJoin,
RelDistributions.SINGLETON)
- : PinotLogicalExchange.create(dynamicFilterJoin,
RelDistributions.hash(join.analyzeCondition().leftKeys));
+ PinotLogicalExchange passThroughAfterJoinExchange =
+ PinotLogicalExchange.create(dynamicFilterJoin,
RelDistributions.hash(join.analyzeCondition().leftKeys));
call.transformTo(passThroughAfterJoinExchange);
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index d5097e7ed7..a4e6be355a 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -31,8 +31,6 @@ import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.SortExchange;
-import org.apache.calcite.rel.hint.PinotHintOptions;
-import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -134,8 +132,8 @@ public final class RelToPlanNodeConverter {
// Compute all the tables involved under this exchange node
Set<String> tableNames = getTableNamesFromRelRoot(node);
- return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()),
exchangeType,
- tableNames, node.getDistribution(), fieldCollations, isSortOnSender,
isSortOnReceiver);
+ return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()),
exchangeType, tableNames,
+ node.getDistribution(), fieldCollations, isSortOnSender,
isSortOnReceiver);
}
private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
@@ -187,11 +185,8 @@ public final class RelToPlanNodeConverter {
new FieldSelectionKeySelector(joinInfo.rightKeys));
List<RexExpression> joinClause =
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
- boolean isColocatedJoin =
- PinotHintStrategyTable.isHintOptionTrue(node.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS,
- PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
return new JoinNode(currentStageId, toDataSchema(node.getRowType()),
toDataSchema(node.getLeft().getRowType()),
- toDataSchema(node.getRight().getRowType()), joinType, joinKeys,
joinClause, isColocatedJoin);
+ toDataSchema(node.getRight().getRowType()), joinType, joinKeys,
joinClause);
}
private static DataSchema toDataSchema(RelDataType rowType) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 7e0e1cdab1..f53cc008f8 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
@@ -41,7 +42,9 @@ import org.apache.pinot.query.routing.QueryServerInstance;
* </ul>
*/
public class DispatchablePlanMetadata implements Serializable {
+ // These 2 fields are extracted from TableScanNode
private final List<String> _scannedTables;
+ private Map<String, String> _tableOptions;
// used for assigning server/worker nodes.
private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
@@ -64,6 +67,9 @@ public class DispatchablePlanMetadata implements Serializable
{
// whether a stage requires singleton instance to execute, e.g. stage
contains global reduce (sort/agg) operator.
private boolean _requiresSingletonInstance;
+ // whether a stage is partitioned table scan
+ private boolean _isPartitionedTableScan;
+
// Total worker count of this stage.
private int _totalWorkerCount;
@@ -72,8 +78,6 @@ public class DispatchablePlanMetadata implements Serializable
{
_serverInstanceToWorkerIdMap = new HashMap<>();
_workerIdToSegmentsMap = new HashMap<>();
_workerIdToMailboxesMap = new HashMap<>();
- _timeBoundaryInfo = null;
- _requiresSingletonInstance = false;
_tableToUnavailableSegmentsMap = new HashMap<>();
}
@@ -85,6 +89,15 @@ public class DispatchablePlanMetadata implements
Serializable {
_scannedTables.add(tableName);
}
+ @Nullable
+ public Map<String, String> getTableOptions() {
+ return _tableOptions;
+ }
+
+ public void setTableOptions(Map<String, String> tableOptions) {
+ _tableOptions = tableOptions;
+ }
+
// -----------------------------------------------
// attached physical plan context.
// -----------------------------------------------
@@ -93,8 +106,7 @@ public class DispatchablePlanMetadata implements
Serializable {
return _workerIdToSegmentsMap;
}
- public void setWorkerIdToSegmentsMap(
- Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap) {
+ public void setWorkerIdToSegmentsMap(Map<Integer, Map<String, List<String>>>
workerIdToSegmentsMap) {
_workerIdToSegmentsMap = workerIdToSegmentsMap;
}
@@ -135,6 +147,14 @@ public class DispatchablePlanMetadata implements
Serializable {
_requiresSingletonInstance = _requiresSingletonInstance ||
newRequireInstance;
}
+ public boolean isPartitionedTableScan() {
+ return _isPartitionedTableScan;
+ }
+
+ public void setPartitionedTableScan(boolean isPartitionedTableScan) {
+ _isPartitionedTableScan = isPartitionedTableScan;
+ }
+
public int getTotalWorkerCount() {
return _totalWorkerCount;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index ccb4622b8b..d177a1d892 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.planner.physical;
+import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -127,6 +128,7 @@ public class DispatchablePlanVisitor implements
PlanNodeVisitor<Void, Dispatchab
public Void visitTableScan(TableScanNode node, DispatchablePlanContext
context) {
DispatchablePlanMetadata dispatchablePlanMetadata =
getOrCreateDispatchablePlanMetadata(node, context);
dispatchablePlanMetadata.addScannedTable(node.getTableName());
+
dispatchablePlanMetadata.setTableOptions(node.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS));
return null;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 180f5a413a..bc6cb7c83e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -59,9 +59,39 @@ public class MailboxAssignmentVisitor extends
DefaultPostOrderTraversalVisitor<V
receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
});
+ } else if (senderMetadata.isPartitionedTableScan()) {
+ // For partitioned table scan, send the data to the worker with the
same worker id (not necessary the same
+ // instance)
+ // TODO: Support further split the single partition into multiple
workers
+ senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
+ for (int workerId : senderWorkerIds) {
+ receiverWorkerIdsMap.forEach((receiverServerInstance,
receiverWorkerIds) -> {
+ for (int receiverWorkerId : receiverWorkerIds) {
+ if (receiverWorkerId == workerId) {
+ String mailboxId =
+ MailboxIdUtils.toPlanMailboxId(senderFragmentId,
workerId, receiverFragmentId, workerId);
+ MailboxMetadata serderMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
+ Collections.singletonList(new
VirtualServerAddress(receiverServerInstance, workerId)),
+ Collections.emptyMap());
+ MailboxMetadata receiverMailboxMetadata = new
MailboxMetadata(Collections.singletonList(mailboxId),
+ Collections.singletonList(new
VirtualServerAddress(senderServerInstance, workerId)),
+ Collections.emptyMap());
+ senderMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>())
+ .put(receiverFragmentId, serderMailboxMetadata);
+ receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>())
+ .put(senderFragmentId, receiverMailboxMetadata);
+ break;
+ }
+ }
+ });
+ }
+ });
} else {
// For other exchange types, send the data to all the instances in the
receiver fragment
- // TODO: Add support for more exchange types
+ // TODO:
+ // 1. Add support for more exchange types
+ // 2. Keep the receiver worker id sequential in the
senderMailboxMetadata so that the partitionId aligns with
+ // the workerId. It is useful for JOIN query when only left table
is partitioned.
senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
for (int senderWorkerId : senderWorkerIds) {
Map<Integer, MailboxMetadata> senderMailboxMetadataMap =
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index b0f576258c..6d089c6239 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -36,8 +36,6 @@ public class JoinNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _joinClause;
@ProtoProperties
- private boolean _isColocatedJoin;
- @ProtoProperties
private List<String> _leftColumnNames;
@ProtoProperties
private List<String> _rightColumnNames;
@@ -47,14 +45,13 @@ public class JoinNode extends AbstractPlanNode {
}
public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema
leftSchema, DataSchema rightSchema,
- JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression>
joinClause, boolean isColocatedJoin) {
+ JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression>
joinClause) {
super(planFragmentId, dataSchema);
_leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
_rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
_joinRelType = joinRelType;
_joinKeys = joinKeys;
_joinClause = joinClause;
- _isColocatedJoin = isColocatedJoin;
}
public JoinRelType getJoinRelType() {
@@ -69,10 +66,6 @@ public class JoinNode extends AbstractPlanNode {
return _joinClause;
}
- public boolean isColocatedJoin() {
- return _isColocatedJoin;
- }
-
public List<String> getLeftColumnNames() {
return _leftColumnNames;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 48506ecfab..5fdb67c7c8 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
@@ -39,8 +40,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
import org.apache.pinot.spi.config.table.TableType;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -96,19 +96,42 @@ public class WorkerManager {
}
private void assignWorkersToLeafFragment(PlanFragment fragment,
DispatchablePlanContext context) {
+ // NOTE: For pipeline breaker, leaf fragment can also have children
+ for (PlanFragment child : fragment.getChildren()) {
+ assignWorkersToNonRootFragment(child, context);
+ }
+
DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
- // table scan stage, need to attach server as well as segment info for
each physical table type.
- List<String> scannedTables = metadata.getScannedTables();
- String logicalTableName = scannedTables.get(0);
- Map<String, RoutingTable> routingTableMap =
getRoutingTable(logicalTableName, context.getRequestId());
- if (routingTableMap.size() == 0) {
- throw new IllegalArgumentException("Unable to find routing entries for
table: " + logicalTableName);
+ Map<String, String> tableOptions = metadata.getTableOptions();
+ String partitionKey = null;
+ int numPartitions = 0;
+ if (tableOptions != null) {
+ partitionKey =
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
+ String partitionSize =
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+ if (partitionSize != null) {
+ numPartitions = Integer.parseInt(partitionSize);
+ }
+ }
+ if (partitionKey == null) {
+ assignWorkersToNonPartitionedLeafFragment(metadata, context);
+ } else {
+ Preconditions.checkState(numPartitions > 0, "'%s' must be provided for
partition key: %s",
+ PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+ assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey,
numPartitions);
}
+ }
+
+ private void
assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata metadata,
+ DispatchablePlanContext context) {
+ String tableName = metadata.getScannedTables().get(0);
+ Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName,
context.getRequestId());
+ Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find
routing entries for table: %s", tableName);
+
// acquire time boundary info if it is a hybrid table.
if (routingTableMap.size() > 1) {
TimeBoundaryInfo timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(
TableNameBuilder.forType(TableType.OFFLINE)
-
.tableNameWithType(TableNameBuilder.extractRawTableName(logicalTableName)));
+
.tableNameWithType(TableNameBuilder.extractRawTableName(tableName)));
if (timeBoundaryInfo != null) {
metadata.setTimeBoundaryInfo(timeBoundaryInfo);
} else {
@@ -133,7 +156,7 @@ public class WorkerManager {
// attach unavailable segments to metadata
if (!routingTable.getUnavailableSegments().isEmpty()) {
- metadata.addTableToUnavailableSegmentsMap(logicalTableName,
routingTable.getUnavailableSegments());
+ metadata.addTableToUnavailableSegmentsMap(tableName,
routingTable.getUnavailableSegments());
}
}
int globalIdx = 0;
@@ -148,22 +171,17 @@ public class WorkerManager {
metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
metadata.setTotalWorkerCount(globalIdx);
-
- // NOTE: For pipeline breaker, leaf fragment can also have children
- for (PlanFragment child : fragment.getChildren()) {
- assignWorkersToNonRootFragment(child, context);
- }
}
/**
- * Acquire routing table for items listed in {@link
org.apache.pinot.query.planner.plannode.TableScanNode}.
+ * Acquire routing table for items listed in {@link TableScanNode}.
*
- * @param logicalTableName it can either be a hybrid table name or a
physical table name with table type.
+ * @param tableName table name with or without type suffix.
* @return keyed-map from table type(s) to routing table(s).
*/
- private Map<String, RoutingTable> getRoutingTable(String logicalTableName,
long requestId) {
- String rawTableName =
TableNameBuilder.extractRawTableName(logicalTableName);
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(logicalTableName);
+ private Map<String, RoutingTable> getRoutingTable(String tableName, long
requestId) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
Map<String, RoutingTable> routingTableMap = new HashMap<>();
RoutingTable routingTable;
if (tableType == null) {
@@ -176,7 +194,7 @@ public class WorkerManager {
routingTableMap.put(TableType.REALTIME.name(), routingTable);
}
} else {
- routingTable = getRoutingTable(logicalTableName, tableType, requestId);
+ routingTable = getRoutingTable(tableName, tableType, requestId);
if (routingTable != null) {
routingTableMap.put(tableType.name(), routingTable);
}
@@ -191,15 +209,60 @@ public class WorkerManager {
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " +
tableNameWithType), requestId);
}
+ private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata
metadata,
+ DispatchablePlanContext context, String partitionKey, int numPartitions)
{
+ String tableName = metadata.getScannedTables().get(0);
+ ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(tableName,
partitionKey, numPartitions);
+
+ // Pick one server per partition
+ // NOTE: Pick server based on the request id so that the same server is
picked across different table scan when the
+ // segments for the same partition is colocated
+ long indexToPick = context.getRequestId();
+ ColocatedPartitionInfo[] partitionInfoMap =
colocatedTableInfo._partitionInfoMap;
+ int nextWorkerId = 0;
+ Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new
HashMap<>();
+ Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new
HashMap<>();
+ Map<String, ServerInstance> enabledServerInstanceMap =
_routingManager.getEnabledServerInstanceMap();
+ for (int i = 0; i < numPartitions; i++) {
+ ColocatedPartitionInfo partitionInfo = partitionInfoMap[i];
+ // TODO: Currently we don't support the case when a partition doesn't
contain any segment. The reason is that the
+ // leaf stage won't be able to directly return empty response.
+ Preconditions.checkState(partitionInfo != null, "Failed to find any
segment for table: %s, partition: %s",
+ tableName, i);
+ ServerInstance serverInstance =
+ pickEnabledServer(partitionInfo._fullyReplicatedServers,
enabledServerInstanceMap, indexToPick++);
+ Preconditions.checkState(serverInstance != null,
+ "Failed to find enabled fully replicated server for table: %s,
partition: %s in table: %s", tableName, i);
+ QueryServerInstance queryServerInstance = new
QueryServerInstance(serverInstance);
+ int workerId = nextWorkerId++;
+ serverInstanceToWorkerIdMap.computeIfAbsent(queryServerInstance, k ->
new ArrayList<>()).add(workerId);
+ workerIdToSegmentsMap.put(workerId, getSegmentsMap(partitionInfo));
+ }
+
+ metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
+ metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
+ metadata.setTotalWorkerCount(nextWorkerId);
+ metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
+ metadata.setPartitionedTableScan(true);
+ }
+
private void assignWorkersToIntermediateFragment(PlanFragment fragment,
DispatchablePlanContext context) {
- if (isColocatedJoin(fragment.getFragmentRoot())) {
- // TODO: Make it more general so that it can be used for other
partitioned cases (e.g. group-by, window function)
- try {
- assignWorkersForColocatedJoin(fragment, context);
+ List<PlanFragment> children = fragment.getChildren();
+ for (PlanFragment child : children) {
+ assignWorkersToNonRootFragment(child, context);
+ }
+
+ Map<Integer, DispatchablePlanMetadata> metadataMap =
context.getDispatchablePlanMetadataMap();
+ DispatchablePlanMetadata metadata =
metadataMap.get(fragment.getFragmentId());
+
+ // If the first child is partitioned table scan, use the same worker
assignment to avoid shuffling data
+ // TODO: Introduce a hint to control this
+ if (children.size() > 0) {
+ DispatchablePlanMetadata firstChildMetadata =
metadataMap.get(children.get(0).getFragmentId());
+ if (firstChildMetadata.isPartitionedTableScan()) {
+
metadata.setServerInstanceToWorkerIdMap(firstChildMetadata.getServerInstanceToWorkerIdMap());
+ metadata.setTotalWorkerCount(firstChildMetadata.getTotalWorkerCount());
return;
- } catch (Exception e) {
- LOGGER.warn("[RequestId: {}] Caught exception while assigning workers
for colocated join, "
- + "falling back to regular worker assignment",
context.getRequestId(), e);
}
}
@@ -224,7 +287,6 @@ public class WorkerManager {
throw new IllegalStateException(
"No server instance found for intermediate stage for tables: " +
Arrays.toString(tableNames.toArray()));
}
- DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
Map<String, String> options = context.getPlannerContext().getOptions();
int stageParallelism =
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
if (metadata.isRequiresSingletonInstance()) {
@@ -246,101 +308,9 @@ public class WorkerManager {
metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
metadata.setTotalWorkerCount(nextWorkerId);
}
-
- for (PlanFragment child : fragment.getChildren()) {
- assignWorkersToNonRootFragment(child, context);
- }
- }
-
- private boolean isColocatedJoin(PlanNode planNode) {
- if (planNode instanceof JoinNode) {
- return ((JoinNode) planNode).isColocatedJoin();
- }
- for (PlanNode child : planNode.getInputs()) {
- if (isColocatedJoin(child)) {
- return true;
- }
- }
- return false;
- }
-
- private void assignWorkersForColocatedJoin(PlanFragment fragment,
DispatchablePlanContext context) {
- List<PlanFragment> children = fragment.getChildren();
- Preconditions.checkArgument(children.size() == 2, "Expecting 2 children,
find: %s", children.size());
- PlanFragment leftFragment = children.get(0);
- PlanFragment rightFragment = children.get(1);
- Map<Integer, DispatchablePlanMetadata> metadataMap =
context.getDispatchablePlanMetadataMap();
- // TODO: Support multi-level colocated join (more than 2 tables colocated)
- DispatchablePlanMetadata leftMetadata =
metadataMap.get(leftFragment.getFragmentId());
- Preconditions.checkArgument(isLeafPlan(leftMetadata), "Left side is not
leaf");
- DispatchablePlanMetadata rightMetadata =
metadataMap.get(rightFragment.getFragmentId());
- Preconditions.checkArgument(isLeafPlan(rightMetadata), "Right side is not
leaf");
-
- String leftTable = leftMetadata.getScannedTables().get(0);
- String rightTable = rightMetadata.getScannedTables().get(0);
- ColocatedTableInfo leftColocatedTableInfo =
getColocatedTableInfo(leftTable);
- ColocatedTableInfo rightColocatedTableInfo =
getColocatedTableInfo(rightTable);
- ColocatedPartitionInfo[] leftPartitionInfoMap =
leftColocatedTableInfo._partitionInfoMap;
- ColocatedPartitionInfo[] rightPartitionInfoMap =
rightColocatedTableInfo._partitionInfoMap;
- // TODO: Support colocated join when both side have different number of
partitions (e.g. left: 8, right: 16)
- int numPartitions = leftPartitionInfoMap.length;
- Preconditions.checkState(numPartitions == rightPartitionInfoMap.length,
- "Got different number of partitions in left table: %s (%s) and right
table: %s (%s)", leftTable, numPartitions,
- rightTable, rightPartitionInfoMap.length);
-
- // Pick one server per partition
- int nextWorkerId = 0;
- Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new
HashMap<>();
- Map<Integer, Map<String, List<String>>> leftWorkerIdToSegmentsMap = new
HashMap<>();
- Map<Integer, Map<String, List<String>>> rightWorkerIdToSegmentsMap = new
HashMap<>();
- Map<String, ServerInstance> enabledServerInstanceMap =
_routingManager.getEnabledServerInstanceMap();
- for (int i = 0; i < numPartitions; i++) {
- ColocatedPartitionInfo leftPartitionInfo = leftPartitionInfoMap[i];
- ColocatedPartitionInfo rightPartitionInfo = rightPartitionInfoMap[i];
- if (leftPartitionInfo == null && rightPartitionInfo == null) {
- continue;
- }
- // TODO: Currently we don't support the case when for a partition only
one side has segments. The reason is that
- // the leaf stage won't be able to directly return empty response.
- Preconditions.checkState(leftPartitionInfo != null && rightPartitionInfo
!= null,
- "One side doesn't have any segment for partition: %s", i);
- Set<String> candidates = new
HashSet<>(leftPartitionInfo._fullyReplicatedServers);
- candidates.retainAll(rightPartitionInfo._fullyReplicatedServers);
- ServerInstance serverInstance = pickRandomEnabledServer(candidates,
enabledServerInstanceMap);
- Preconditions.checkState(serverInstance != null,
- "Failed to find enabled fully replicated server for partition: %s in
table: %s and %s", i, leftTable,
- rightTable);
- QueryServerInstance queryServerInstance = new
QueryServerInstance(serverInstance);
- int workerId = nextWorkerId++;
- serverInstanceToWorkerIdMap.computeIfAbsent(queryServerInstance, k ->
new ArrayList<>()).add(workerId);
- leftWorkerIdToSegmentsMap.put(workerId,
getSegmentsMap(leftPartitionInfo));
- rightWorkerIdToSegmentsMap.put(workerId,
getSegmentsMap(rightPartitionInfo));
- }
-
- DispatchablePlanMetadata joinMetadata =
metadataMap.get(fragment.getFragmentId());
- joinMetadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
- joinMetadata.setTotalWorkerCount(nextWorkerId);
-
- leftMetadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
- leftMetadata.setWorkerIdToSegmentsMap(leftWorkerIdToSegmentsMap);
- leftMetadata.setTotalWorkerCount(nextWorkerId);
- leftMetadata.setTimeBoundaryInfo(leftColocatedTableInfo._timeBoundaryInfo);
-
- rightMetadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
- rightMetadata.setWorkerIdToSegmentsMap(rightWorkerIdToSegmentsMap);
- rightMetadata.setTotalWorkerCount(nextWorkerId);
-
rightMetadata.setTimeBoundaryInfo(rightColocatedTableInfo._timeBoundaryInfo);
-
- // NOTE: For pipeline breaker, leaf fragment can also have children
- for (PlanFragment child : leftFragment.getChildren()) {
- assignWorkersToNonRootFragment(child, context);
- }
- for (PlanFragment child : rightFragment.getChildren()) {
- assignWorkersToNonRootFragment(child, context);
- }
}
- private ColocatedTableInfo getColocatedTableInfo(String tableName) {
+ private ColocatedTableInfo getColocatedTableInfo(String tableName, String
partitionKey, int numPartitions) {
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == null) {
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
@@ -354,14 +324,12 @@ public class WorkerManager {
TimeBoundaryInfo timeBoundaryInfo =
_routingManager.getTimeBoundaryInfo(offlineTableName);
// Ignore OFFLINE side when time boundary info is unavailable
if (timeBoundaryInfo == null) {
- return getRealtimeColocatedTableInfo(realtimeTableName);
+ return getRealtimeColocatedTableInfo(realtimeTableName,
partitionKey, numPartitions);
}
- PartitionInfo[] offlinePartitionInfoMap =
getTablePartitionInfo(offlineTableName).getPartitionInfoMap();
- PartitionInfo[] realtimePartitionInfoMap =
getTablePartitionInfo(realtimeTableName).getPartitionInfoMap();
- int numPartitions = offlinePartitionInfoMap.length;
- Preconditions.checkState(numPartitions ==
realtimePartitionInfoMap.length,
- "Got different number of partitions in OFFLINE side: %s and
REALTIME side: %s of table: %s", numPartitions,
- realtimePartitionInfoMap.length, tableName);
+ PartitionInfo[] offlinePartitionInfoMap =
+ getTablePartitionInfo(offlineTableName, partitionKey,
numPartitions).getPartitionInfoMap();
+ PartitionInfo[] realtimePartitionInfoMap =
+ getTablePartitionInfo(realtimeTableName, partitionKey,
numPartitions).getPartitionInfoMap();
ColocatedPartitionInfo[] colocatedPartitionInfoMap = new
ColocatedPartitionInfo[numPartitions];
for (int i = 0; i < numPartitions; i++) {
PartitionInfo offlinePartitionInfo = offlinePartitionInfoMap[i];
@@ -391,32 +359,39 @@ public class WorkerManager {
}
return new ColocatedTableInfo(colocatedPartitionInfoMap,
timeBoundaryInfo);
} else if (offlineRoutingExists) {
- return getOfflineColocatedTableInfo(offlineTableName);
+ return getOfflineColocatedTableInfo(offlineTableName, partitionKey,
numPartitions);
} else {
- return getRealtimeColocatedTableInfo(realtimeTableName);
+ return getRealtimeColocatedTableInfo(realtimeTableName, partitionKey,
numPartitions);
}
} else {
if (tableType == TableType.OFFLINE) {
- return getOfflineColocatedTableInfo(tableName);
+ return getOfflineColocatedTableInfo(tableName, partitionKey,
numPartitions);
} else {
- return getRealtimeColocatedTableInfo(tableName);
+ return getRealtimeColocatedTableInfo(tableName, partitionKey,
numPartitions);
}
}
}
- private TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
+ private TablePartitionInfo getTablePartitionInfo(String tableNameWithType,
String partitionKey, int numPartitions) {
TablePartitionInfo tablePartitionInfo =
_routingManager.getTablePartitionInfo(tableNameWithType);
Preconditions.checkState(tablePartitionInfo != null, "Failed to find table
partition info for table: %s",
tableNameWithType);
+
Preconditions.checkState(tablePartitionInfo.getPartitionColumn().equals(partitionKey),
+ "Partition key: %s does not match partition column: %s for table: %s",
partitionKey,
+ tablePartitionInfo.getPartitionColumn(), tableNameWithType);
+ Preconditions.checkState(tablePartitionInfo.getNumPartitions() ==
numPartitions,
+ "Partition size mismatch (hint: %s, table: %s) for table: %s",
numPartitions,
+ tablePartitionInfo.getNumPartitions(), tableNameWithType);
Preconditions.checkState(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty(),
"Find %s segments with invalid partition for table: %s",
tablePartitionInfo.getSegmentsWithInvalidPartition().size(),
tableNameWithType);
return tablePartitionInfo;
}
- private ColocatedTableInfo getOfflineColocatedTableInfo(String
offlineTableName) {
- PartitionInfo[] partitionInfoMap =
getTablePartitionInfo(offlineTableName).getPartitionInfoMap();
- int numPartitions = partitionInfoMap.length;
+ private ColocatedTableInfo getOfflineColocatedTableInfo(String
offlineTableName, String partitionKey,
+ int numPartitions) {
+ PartitionInfo[] partitionInfoMap =
+ getTablePartitionInfo(offlineTableName, partitionKey,
numPartitions).getPartitionInfoMap();
ColocatedPartitionInfo[] colocatedPartitionInfoMap = new
ColocatedPartitionInfo[numPartitions];
for (int i = 0; i < numPartitions; i++) {
PartitionInfo partitionInfo = partitionInfoMap[i];
@@ -428,9 +403,10 @@ public class WorkerManager {
return new ColocatedTableInfo(colocatedPartitionInfoMap, null);
}
- private ColocatedTableInfo getRealtimeColocatedTableInfo(String
realtimeTableName) {
- PartitionInfo[] partitionInfoMap =
getTablePartitionInfo(realtimeTableName).getPartitionInfoMap();
- int numPartitions = partitionInfoMap.length;
+ private ColocatedTableInfo getRealtimeColocatedTableInfo(String
realtimeTableName, String partitionKey,
+ int numPartitions) {
+ PartitionInfo[] partitionInfoMap =
+ getTablePartitionInfo(realtimeTableName, partitionKey,
numPartitions).getPartitionInfoMap();
ColocatedPartitionInfo[] colocatedPartitionInfoMap = new
ColocatedPartitionInfo[numPartitions];
for (int i = 0; i < numPartitions; i++) {
PartitionInfo partitionInfo = partitionInfoMap[i];
@@ -465,15 +441,26 @@ public class WorkerManager {
}
}
+ /**
+ * Picks an enabled server deterministically based on the given index to
pick.
+ */
@Nullable
- private static ServerInstance pickRandomEnabledServer(Set<String> candidates,
- Map<String, ServerInstance> enabledServerInstanceMap) {
- if (candidates.isEmpty()) {
+ private static ServerInstance pickEnabledServer(Set<String> candidates,
+ Map<String, ServerInstance> enabledServerInstanceMap, long indexToPick) {
+ int numCandidates = candidates.size();
+ if (numCandidates == 0) {
return null;
}
+ if (numCandidates == 1) {
+ return enabledServerInstanceMap.get(candidates.iterator().next());
+ }
+ List<String> candidateList = new ArrayList<>(candidates);
+ candidateList.sort(null);
+ int startIndex = (int) ((indexToPick & Long.MAX_VALUE) % numCandidates);
String[] servers = candidates.toArray(new String[0]);
ArrayUtils.shuffle(servers, RANDOM);
- for (String server : servers) {
+ for (int i = 0; i < numCandidates; i++) {
+ String server = candidateList.get((startIndex + i) % numCandidates);
ServerInstance serverInstance = enabledServerInstanceMap.get(server);
if (serverInstance != null) {
return serverInstance;
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index c6308d0854..59aa44553d 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -22,13 +22,19 @@ import
com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionInfo.PartitionInfo;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.testutils.MockRoutingManagerFactory;
@@ -72,7 +78,7 @@ public class QueryEnvironmentTestBase {
@BeforeClass
public void setUp() {
// the port doesn't matter as we are not actually making a server call.
- _queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS,
SERVER1_SEGMENTS, SERVER2_SEGMENTS);
+ _queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS,
SERVER1_SEGMENTS, SERVER2_SEGMENTS, null);
}
@DataProvider(name = "testQueryDataProvider")
@@ -96,9 +102,8 @@ public class QueryEnvironmentTestBase {
new Object[]{"SELECT SUM(a.col3), COUNT(*) FROM a WHERE a.col3 >= 0
AND a.col2 = 'a'"},
new Object[]{"SELECT AVG(a.col3), SUM(a.col3), COUNT(a.col3) FROM a"},
new Object[]{"SELECT a.col1, AVG(a.col3), SUM(a.col3), COUNT(a.col3)
FROM a GROUP BY a.col1"},
- // TODO: support BOOL_AND and BOOL_OR as MIN/MAX
-// new Object[]{"SELECT BOOL_AND(a.col5), BOOL_OR(a.col5) FROM a"},
-// new Object[]{"SELECT a.col3, BOOL_AND(a.col5), BOOL_OR(a.col5) FROM
a GROUP BY a.col3"},
+ new Object[]{"SELECT BOOL_AND(a.col5), BOOL_OR(a.col5) FROM a"},
+ new Object[]{"SELECT a.col3, BOOL_AND(a.col5), BOOL_OR(a.col5) FROM a
GROUP BY a.col3"},
new Object[]{"SELECT KURTOSIS(a.col2), COUNT(DISTINCT a.col3),
SKEWNESS(a.col3) FROM a"},
new Object[]{"SELECT a.col1, KURTOSIS(a.col2), SKEWNESS(a.col3) FROM a
GROUP BY a.col1"},
new Object[]{"SELECT COUNT(a.col3), AVG(a.col3), SUM(a.col3),
MIN(a.col3), MAX(a.col3) FROM a"},
@@ -140,9 +145,8 @@ public class QueryEnvironmentTestBase {
new Object[]{"SELECT RANK() OVER(PARTITION BY a.col2 ORDER BY a.col1)
FROM a"},
new Object[]{"SELECT DENSE_RANK() OVER(ORDER BY a.col1) FROM a"},
new Object[]{"SELECT a.col1, SUM(a.col3) OVER (ORDER BY a.col2),
MIN(a.col3) OVER (ORDER BY a.col2) FROM a"},
- new Object[]{"SELECT /*+
joinOptions(is_colocated_by_join_keys='true'), "
- + "aggOptions(is_partitioned_by_group_by_keys='true') */ a.col3,
a.col1, SUM(b.col3) FROM a JOIN b "
- + "ON a.col3 = b.col3 GROUP BY a.col3, a.col1"},
+ new Object[]{"SELECT /*+
aggOptions(is_partitioned_by_group_by_keys='true') */ a.col3, a.col1,
SUM(b.col3) "
+ + "FROM a JOIN b ON a.col3 = b.col3 GROUP BY a.col3, a.col1"},
new Object[]{"SELECT /*+
aggOptions(is_skip_leaf_stage_group_by='true') */ a.col2, COUNT(*),
SUM(a.col3), "
+ "SUM(a.col1) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY
a.col2 HAVING COUNT(*) > 10 "
+ "AND MAX(a.col3) >= 0 AND MIN(a.col3) < 20 AND SUM(a.col3) <= 10
AND AVG(a.col3) = 5"},
@@ -165,7 +169,8 @@ public class QueryEnvironmentTestBase {
}
public static QueryEnvironment getQueryEnvironment(int reducerPort, int
port1, int port2,
- Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1,
Map<String, List<String>> segmentMap2) {
+ Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1,
Map<String, List<String>> segmentMap2,
+ @Nullable Map<String, Pair<String, List<List<String>>>>
partitionedSegmentsMap) {
MockRoutingManagerFactory factory = new MockRoutingManagerFactory(port1,
port2);
for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
factory.registerTable(entry.getValue(), entry.getKey());
@@ -180,7 +185,28 @@ public class QueryEnvironmentTestBase {
factory.registerSegment(port2, entry.getKey(), segment);
}
}
- RoutingManager routingManager = factory.buildRoutingManager();
+ Map<String, TablePartitionInfo> partitionInfoMap = null;
+ if (MapUtils.isNotEmpty(partitionedSegmentsMap)) {
+ partitionInfoMap = new HashMap<>();
+ for (Map.Entry<String, Pair<String, List<List<String>>>> entry :
partitionedSegmentsMap.entrySet()) {
+ String tableNameWithType = entry.getKey();
+ String partitionColumn = entry.getValue().getLeft();
+ List<List<String>> partitionIdToSegmentsMap =
entry.getValue().getRight();
+ int numPartitions = partitionIdToSegmentsMap.size();
+ String hostname1 = MockRoutingManagerFactory.toHostname(port1);
+ String hostname2 = MockRoutingManagerFactory.toHostname(port2);
+ PartitionInfo[] partitionIdToInfoMap = new
PartitionInfo[numPartitions];
+ for (int i = 0; i < numPartitions; i++) {
+ String hostname = i < (numPartitions / 2) ? hostname1 : hostname2;
+ partitionIdToInfoMap[i] = new
PartitionInfo(Collections.singleton(hostname), partitionIdToSegmentsMap.get(i));
+ }
+ TablePartitionInfo tablePartitionInfo =
+ new TablePartitionInfo(tableNameWithType, partitionColumn,
"hashCode", numPartitions, partitionIdToInfoMap,
+ Collections.emptySet());
+ partitionInfoMap.put(tableNameWithType, tablePartitionInfo);
+ }
+ }
+ RoutingManager routingManager =
factory.buildRoutingManager(partitionInfoMap);
TableCache tableCache = factory.buildTableCache();
return new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 58db9c7f41..7442ade6e2 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -21,8 +21,10 @@ package org.apache.pinot.query.testutils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
@@ -34,7 +36,6 @@ import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -51,62 +52,52 @@ public class MockRoutingManagerFactory {
private static final String TIME_BOUNDARY_COLUMN = "ts";
private static final String HOST_NAME = "localhost";
- private final HashMap<String, String> _tableNameMap;
+ private final Map<String, String> _tableNameMap;
private final Map<String, Schema> _schemaMap;
-
+ private final Set<String> _hybridTables;
private final Map<String, ServerInstance> _serverInstances;
- private final Map<String, RoutingTable> _routingTableMap;
- private final List<String> _hybridTables;
-
- private final Map<String, Map<ServerInstance, List<String>>>
_tableServerSegmentMap;
+ private final Map<String, Map<ServerInstance, List<String>>>
_tableServerSegmentsMap;
public MockRoutingManagerFactory(int... ports) {
- _hybridTables = new ArrayList<>();
- _serverInstances = new HashMap<>();
- _schemaMap = new HashMap<>();
_tableNameMap = new HashMap<>();
- _routingTableMap = new HashMap<>();
-
- _tableServerSegmentMap = new HashMap<>();
+ _schemaMap = new HashMap<>();
+ _hybridTables = new HashSet<>();
+ _serverInstances = new HashMap<>();
+ _tableServerSegmentsMap = new HashMap<>();
for (int port : ports) {
_serverInstances.put(toHostname(port), getServerInstance(HOST_NAME,
port, port, port, port));
}
}
- public MockRoutingManagerFactory registerTable(Schema schema, String
tableName) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType == null) {
- registerTableNameWithType(schema,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
- registerTableNameWithType(schema,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
- _hybridTables.add(tableName);
+ public void registerTable(Schema schema, String tableName) {
+ if (TableNameBuilder.isTableResource(tableName)) {
+ registerTableNameWithType(schema, tableName);
} else {
- registerTableNameWithType(schema,
TableNameBuilder.forType(tableType).tableNameWithType(tableName));
+ registerTableNameWithType(schema,
TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+ registerTableNameWithType(schema,
TableNameBuilder.REALTIME.tableNameWithType(tableName));
+ _hybridTables.add(tableName);
}
- return this;
}
- public MockRoutingManagerFactory registerSegment(int insertToServerPort,
String tableNameWithType,
- String segmentName) {
- Map<ServerInstance, List<String>> serverSegmentMap =
- _tableServerSegmentMap.getOrDefault(tableNameWithType, new
HashMap<>());
- ServerInstance serverInstance =
_serverInstances.get(toHostname(insertToServerPort));
+ private void registerTableNameWithType(Schema schema, String
tableNameWithType) {
+ _tableNameMap.put(tableNameWithType, tableNameWithType);
+ _schemaMap.put(TableNameBuilder.extractRawTableName(tableNameWithType),
schema);
+ }
- List<String> sSegments = serverSegmentMap.getOrDefault(serverInstance, new
ArrayList<>());
- sSegments.add(segmentName);
- serverSegmentMap.put(serverInstance, sSegments);
- _tableServerSegmentMap.put(tableNameWithType, serverSegmentMap);
- return this;
+ public void registerSegment(int insertToServerPort, String
tableNameWithType, String segmentName) {
+ ServerInstance serverInstance =
_serverInstances.get(toHostname(insertToServerPort));
+ _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new
HashMap<>())
+ .computeIfAbsent(serverInstance, k -> new
ArrayList<>()).add(segmentName);
}
- public RoutingManager buildRoutingManager() {
- // create all the fake routing tables
- _routingTableMap.clear();
- for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry :
_tableServerSegmentMap.entrySet()) {
+ public RoutingManager buildRoutingManager(@Nullable Map<String,
TablePartitionInfo> partitionInfoMap) {
+ Map<String, RoutingTable> routingTableMap = new HashMap<>();
+ for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry :
_tableServerSegmentsMap.entrySet()) {
String tableNameWithType = tableEntry.getKey();
RoutingTable fakeRoutingTable = new RoutingTable(tableEntry.getValue(),
Collections.emptyList(), 0);
- _routingTableMap.put(tableNameWithType, fakeRoutingTable);
+ routingTableMap.put(tableNameWithType, fakeRoutingTable);
}
- return new FakeRoutingManager(_routingTableMap, _serverInstances,
_hybridTables);
+ return new FakeRoutingManager(routingTableMap, _hybridTables,
partitionInfoMap, _serverInstances);
}
public TableCache buildTableCache() {
@@ -119,7 +110,7 @@ public class MockRoutingManagerFactory {
return mock;
}
- private static String toHostname(int port) {
+ public static String toHostname(int port) {
return String.format("%s_%d", HOST_NAME, port);
}
@@ -137,23 +128,18 @@ public class MockRoutingManagerFactory {
return new ServerInstance(instanceConfig);
}
- private void registerTableNameWithType(Schema schema, String
tableNameWithType) {
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- _tableNameMap.put(tableNameWithType, rawTableName);
- _schemaMap.put(rawTableName, schema);
- _schemaMap.put(tableNameWithType, schema);
- }
-
private static class FakeRoutingManager implements RoutingManager {
private final Map<String, RoutingTable> _routingTableMap;
+ private final Set<String> _hybridTables;
+ private final Map<String, TablePartitionInfo> _partitionInfoMap;
private final Map<String, ServerInstance> _serverInstances;
- private final List<String> _hybridTables;
- public FakeRoutingManager(Map<String, RoutingTable> routingTableMap,
Map<String, ServerInstance> serverInstances,
- List<String> hybridTables) {
+ public FakeRoutingManager(Map<String, RoutingTable> routingTableMap,
Set<String> hybridTables,
+ @Nullable Map<String, TablePartitionInfo> partitionInfoMap,
Map<String, ServerInstance> serverInstances) {
_routingTableMap = routingTableMap;
- _serverInstances = serverInstances;
_hybridTables = hybridTables;
+ _partitionInfoMap = partitionInfoMap;
+ _serverInstances = serverInstances;
}
@Override
@@ -163,9 +149,8 @@ public class MockRoutingManagerFactory {
@Override
public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long
requestId) {
- String tableName =
brokerRequest.getPinotQuery().getDataSource().getTableName();
- return _routingTableMap.getOrDefault(tableName,
-
_routingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
+ String tableNameWithType =
brokerRequest.getPinotQuery().getDataSource().getTableName();
+ return _routingTableMap.get(tableNameWithType);
}
@Override
@@ -173,9 +158,10 @@ public class MockRoutingManagerFactory {
return _routingTableMap.containsKey(tableNameWithType);
}
+ @Nullable
@Override
- public TimeBoundaryInfo getTimeBoundaryInfo(String tableName) {
- String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(offlineTableName);
return _hybridTables.contains(rawTableName) ? new
TimeBoundaryInfo(TIME_BOUNDARY_COLUMN,
String.valueOf(System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(1))) : null;
}
@@ -183,7 +169,7 @@ public class MockRoutingManagerFactory {
@Nullable
@Override
public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
- return null;
+ return _partitionInfoMap != null ?
_partitionInfoMap.get(tableNameWithType) : null;
}
@Override
diff --git
a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
index f5ba9d3182..892b4f060b 100644
--- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
@@ -1,33 +1,19 @@
{
"pinot_hint_option_tests": {
"queries": [
- {
- "description": "SELECT * inner join with filter on one table",
- "sql": "EXPLAIN PLAN FOR SELECT /*+
joinOptions(is_colocated_by_join_keys='true') */ * FROM a JOIN b ON a.col1 =
b.col2 WHERE a.col3 >= 0",
- "output": [
- "Execution Plan",
- "\nLogicalJoin(condition=[=($0, $8)], joinType=[inner])",
- "\n PinotLogicalExchange(distribution=[single])",
- "\n LogicalFilter(condition=[>=($2, 0)])",
- "\n LogicalTableScan(table=[[a]])",
- "\n PinotLogicalExchange(distribution=[single])",
- "\n LogicalTableScan(table=[[b]])",
- "\n"
- ]
- },
{
"description": "Inner join with group by",
- "sql": "EXPLAIN PLAN FOR SELECT /*+
joinOptions(is_colocated_by_join_keys='true'),
aggOptions(is_partitioned_by_group_by_keys='true') */a.col1, AVG(b.col3) FROM a
JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0
GROUP BY a.col1",
+ "sql": "EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_partitioned_by_group_by_keys='true') */ a.col1, AVG(b.col3) FROM
a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0
GROUP BY a.col1",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL,
$2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)],
agg#1=[COUNT()])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
- "\n PinotLogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
- "\n PinotLogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[<($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -36,7 +22,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy",
- "sql": "EXPLAIN PLAN FOR SELECT /*+
joinOptions(join_strategy='dynamic_broadcast',is_colocated_by_join_keys='false')
*/ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 >
0)",
+ "sql": "EXPLAIN PLAN FOR SELECT /*+
joinOptions(join_strategy='dynamic_broadcast') */ a.col1, a.col2 FROM a WHERE
a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"output": [
"Execution Plan",
"\nPinotLogicalExchange(distribution=[hash[0]])",
@@ -50,37 +36,6 @@
"\n"
]
},
- {
- "description": "semi-join with colocated join key",
- "sql": "EXPLAIN PLAN FOR SELECT /*+
joinOptions(is_colocated_by_join_keys) */ * FROM a WHERE a.col1 IN (SELECT col2
FROM b WHERE b.col3 > 0)",
- "output": [
- "Execution Plan",
- "\nLogicalJoin(condition=[=($0, $7)], joinType=[semi])",
- "\n PinotLogicalExchange(distribution=[hash[0]])",
- "\n LogicalTableScan(table=[[a]])",
- "\n PinotLogicalExchange(distribution=[hash[0]])",
- "\n LogicalProject(col2=[$1], col3=[$2])",
- "\n LogicalFilter(condition=[>($2, 0)])",
- "\n LogicalTableScan(table=[[b]])",
- "\n"
- ]
- },
- {
- "description": "semi-join with colocated join key and
dynamic_broadcast join strategy",
- "sql": "EXPLAIN PLAN FOR SELECT /*+
joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true') */ a.col1, a.col2 FROM a WHERE a.col1 IN
(SELECT col2 FROM b WHERE b.col3 > 0)",
- "output": [
- "Execution Plan",
- "\nPinotLogicalExchange(distribution=[single])",
- "\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[a]])",
- "\n PinotLogicalExchange(distribution=[single],
relExchangeType=[PIPELINE_BREAKER])",
- "\n LogicalProject(col2=[$1], col3=[$2])",
- "\n LogicalFilter(condition=[>($2, 0)])",
- "\n LogicalTableScan(table=[[b]])",
- "\n"
- ]
- },
{
"description": "semi-join with dynamic_broadcast join strategy then
group-by on same key",
"sql": "EXPLAIN PLAN FOR SELECT /*+
joinOptions(join_strategy='dynamic_broadcast'),
aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE
a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 149fc39587..1731f34fe9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -93,25 +93,27 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
@BeforeClass
public void setUp()
throws Exception {
- MockInstanceDataManagerFactory factory1 = new
MockInstanceDataManagerFactory("server1")
- .registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
- .registerTable(SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME")
- .registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
- .registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d")
- .addSegment("a_REALTIME", buildRows("a_REALTIME"))
- .addSegment("a_REALTIME", buildRows("a_REALTIME"))
- .addSegment("b_REALTIME", buildRows("b_REALTIME"))
- .addSegment("c_OFFLINE", buildRows("c_OFFLINE"))
- .addSegment("d_OFFLINE", buildRows("d_OFFLINE"));
- MockInstanceDataManagerFactory factory2 = new
MockInstanceDataManagerFactory("server2")
- .registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
- .registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
- .registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d")
- .addSegment("a_REALTIME", buildRows("a_REALTIME"))
- .addSegment("c_OFFLINE", buildRows("c_OFFLINE"))
- .addSegment("c_OFFLINE", buildRows("c_OFFLINE"))
- .addSegment("d_OFFLINE", buildRows("d_OFFLINE"))
- .addSegment("d_REALTIME", buildRows("d_REALTIME"));
+ MockInstanceDataManagerFactory factory1 = new
MockInstanceDataManagerFactory("server1");
+ factory1.registerTable(SCHEMA_BUILDER.setSchemaName("a").build(),
"a_REALTIME");
+ factory1.registerTable(SCHEMA_BUILDER.setSchemaName("b").build(),
"b_REALTIME");
+ factory1.registerTable(SCHEMA_BUILDER.setSchemaName("c").build(),
"c_OFFLINE");
+ factory1.registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d");
+ factory1.addSegment("a_REALTIME", buildRows("a_REALTIME"));
+ factory1.addSegment("a_REALTIME", buildRows("a_REALTIME"));
+ factory1.addSegment("b_REALTIME", buildRows("b_REALTIME"));
+ factory1.addSegment("c_OFFLINE", buildRows("c_OFFLINE"));
+ factory1.addSegment("d_OFFLINE", buildRows("d_OFFLINE"));
+
+ MockInstanceDataManagerFactory factory2 = new
MockInstanceDataManagerFactory("server2");
+ factory2.registerTable(SCHEMA_BUILDER.setSchemaName("a").build(),
"a_REALTIME");
+ factory2.registerTable(SCHEMA_BUILDER.setSchemaName("c").build(),
"c_OFFLINE");
+ factory2.registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d");
+ factory2.addSegment("a_REALTIME", buildRows("a_REALTIME"));
+ factory2.addSegment("c_OFFLINE", buildRows("c_OFFLINE"));
+ factory2.addSegment("c_OFFLINE", buildRows("c_OFFLINE"));
+ factory2.addSegment("d_OFFLINE", buildRows("d_OFFLINE"));
+ factory2.addSegment("d_REALTIME", buildRows("d_REALTIME"));
+
QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
@@ -135,9 +137,10 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
_reducerScheduler.startAsync();
_mailboxService.start();
- _queryEnvironment =
QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort,
server1.getPort(),
- server2.getPort(), factory1.buildSchemaMap(),
factory1.buildTableSegmentNameMap(),
- factory2.buildTableSegmentNameMap());
+ _queryEnvironment =
+ QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort,
server1.getPort(), server2.getPort(),
+ factory1.getRegisteredSchemaMap(),
factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(),
+ null);
server1.start();
server2.start();
// this doesn't test the QueryServer functionality so the server port can
be the same as the mailbox port.
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index fc78863778..cf691b2f09 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -94,7 +94,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
HashJoinOperator joinOnString =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -132,7 +132,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator joinOnInt =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = joinOnInt.nextBlock();
@@ -167,7 +167,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, false);
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
HashJoinOperator joinOnInt =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = joinOnInt.nextBlock();
@@ -209,7 +209,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.LEFT,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -244,7 +244,7 @@ public class HashJoinOperatorTest {
});
List<RexExpression> joinClauses = new ArrayList<>();
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -276,7 +276,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.LEFT,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -312,7 +312,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -351,7 +351,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, false);
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -390,7 +390,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, false);
+ getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -425,7 +425,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.RIGHT,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator joinOnNum =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = joinOnNum.nextBlock();
@@ -475,7 +475,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.SEMI,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -515,7 +515,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.FULL,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -568,7 +568,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.ANTI,
- getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+ getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
@@ -607,7 +607,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -641,7 +641,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
@@ -678,7 +678,7 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.STRING
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema,
JoinRelType.INNER,
- getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+ getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
HashJoinOperator join =
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index a9aeb1ec2a..5318a9b580 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -151,7 +151,7 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode mailboxReceiveNode2 =
new MailboxReceiveNode(0, DATA_SCHEMA, 2,
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null, false);
+ JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null);
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(mailboxReceiveNode2);
DistributedStagePlan distributedStagePlan =
@@ -246,7 +246,7 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode incorrectlyConfiguredMailboxNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 3,
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null, false);
+ JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null);
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
DistributedStagePlan distributedStagePlan =
@@ -284,7 +284,7 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode incorrectlyConfiguredMailboxNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 2,
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null, false);
+ JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA,
JoinRelType.INNER, null, null);
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
DistributedStagePlan distributedStagePlan =
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 7db0386c15..dda0981b41 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -37,6 +37,8 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.BrokerResponseStats;
@@ -71,6 +73,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
private static final Random RANDOM = new Random(42);
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
+ private static final int NUM_PARTITIONS = 4;
private final Map<String, Set<String>> _tableToSegmentMap = new HashMap<>();
@@ -85,6 +88,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
setH2Connection();
// Scan through all the test cases.
+ Map<String, Pair<String, List<List<String>>>> partitionedSegmentsMap = new
HashMap<>();
for (Map.Entry<String, QueryTestCase> testCaseEntry :
getTestCases().entrySet()) {
String testCaseName = testCaseEntry.getKey();
QueryTestCase testCase = testCaseEntry.getValue();
@@ -98,51 +102,54 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
boolean allowEmptySegment =
!BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps,
"noEmptySegment"));
String tableName = testCaseName + "_" + tableEntry.getKey();
// Testing only OFFLINE table b/c Hybrid table test is a special case
to test separately.
- String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
- org.apache.pinot.spi.data.Schema pinotSchema =
constructSchema(tableName, tableEntry.getValue()._schema);
+ String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
+ Schema pinotSchema = constructSchema(tableName,
tableEntry.getValue()._schema);
schemaMap.put(tableName, pinotSchema);
- factory1.registerTable(pinotSchema, tableNameWithType);
- factory2.registerTable(pinotSchema, tableNameWithType);
+ factory1.registerTable(pinotSchema, offlineTableName);
+ factory2.registerTable(pinotSchema, offlineTableName);
List<QueryTestCase.ColumnAndType> columnAndTypes =
tableEntry.getValue()._schema;
List<GenericRow> genericRows = toRow(columnAndTypes,
tableEntry.getValue()._inputs);
// generate segments and dump into server1 and server2
List<String> partitionColumns =
tableEntry.getValue()._partitionColumns;
+ String partitionColumn = null;
+ List<List<String>> partitionIdToSegmentsMap = null;
+ if (partitionColumns != null && partitionColumns.size() == 1) {
+ partitionColumn = partitionColumns.get(0);
+ partitionIdToSegmentsMap = new ArrayList<>();
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ partitionIdToSegmentsMap.add(new ArrayList<>());
+ }
+ }
- List<GenericRow> rows1 = new ArrayList<>();
- List<GenericRow> rows2 = new ArrayList<>();
+ List<List<GenericRow>> partitionIdToRowsMap = new ArrayList<>();
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ partitionIdToRowsMap.add(new ArrayList<>());
+ }
for (GenericRow row : genericRows) {
if (row == SEGMENT_BREAKER_ROW) {
- if (allowEmptySegment || rows1.size() > 0) {
- factory1.addSegment(tableNameWithType, rows1);
- rows1 = new ArrayList<>();
- }
- if (allowEmptySegment || rows2.size() > 0) {
- factory2.addSegment(tableNameWithType, rows2);
- rows2 = new ArrayList<>();
- }
+ addSegments(factory1, factory2, offlineTableName,
allowEmptySegment, partitionIdToRowsMap,
+ partitionIdToSegmentsMap);
} else {
- long partition = 0;
+ int partitionId;
if (partitionColumns == null) {
- partition = RANDOM.nextInt(2);
+ partitionId = RANDOM.nextInt(NUM_PARTITIONS);
} else {
+ int hashCode = 0;
for (String field : partitionColumns) {
- partition = (partition + row.getValue(field).hashCode()) % 42;
+ hashCode += row.getValue(field).hashCode();
}
+ partitionId = (hashCode & Integer.MAX_VALUE) % NUM_PARTITIONS;
}
- if (partition % 2 == 0) {
- rows1.add(row);
- } else {
- rows2.add(row);
- }
+ partitionIdToRowsMap.get(partitionId).add(row);
}
}
- if (allowEmptySegment || rows1.size() > 0) {
- factory1.addSegment(tableNameWithType, rows1);
- }
- if (allowEmptySegment || rows2.size() > 0) {
- factory2.addSegment(tableNameWithType, rows2);
+ addSegments(factory1, factory2, offlineTableName, allowEmptySegment,
partitionIdToRowsMap,
+ partitionIdToSegmentsMap);
+
+ if (partitionColumn != null) {
+ partitionedSegmentsMap.put(offlineTableName,
Pair.of(partitionColumn, partitionIdToSegmentsMap));
}
}
@@ -197,7 +204,8 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
_queryEnvironment =
QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort,
server1.getPort(), server2.getPort(),
- factory1.buildSchemaMap(), factory1.buildTableSegmentNameMap(),
factory2.buildTableSegmentNameMap());
+ factory1.getRegisteredSchemaMap(),
factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(),
+ partitionedSegmentsMap);
server1.start();
server2.start();
// this doesn't test the QueryServer functionality so the server port can
be the same as the mailbox port.
@@ -208,6 +216,22 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
_servers.put(new QueryServerInstance("localhost", port2, port2), server2);
}
+ private void addSegments(MockInstanceDataManagerFactory factory1,
MockInstanceDataManagerFactory factory2,
+ String offlineTableName, boolean allowEmptySegment,
List<List<GenericRow>> partitionIdToRowsMap,
+ @Nullable List<List<String>> partitionIdToSegmentsMap) {
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ MockInstanceDataManagerFactory factory = i < (NUM_PARTITIONS / 2) ?
factory1 : factory2;
+ List<GenericRow> rows = partitionIdToRowsMap.get(i);
+ if (allowEmptySegment || !rows.isEmpty()) {
+ String segmentName = factory.addSegment(offlineTableName, rows);
+ if (partitionIdToSegmentsMap != null) {
+ partitionIdToSegmentsMap.get(i).add(segmentName);
+ }
+ rows.clear();
+ }
+ }
+ }
+
@AfterClass
public void tearDown() {
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 62311432ee..397ed19fd2 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -85,7 +85,7 @@ public class QueryDispatcherTest extends QueryTestSet {
// reducer port doesn't matter, we are testing the worker instance not
GRPC.
_queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1,
portList.get(0), portList.get(1),
QueryEnvironmentTestBase.TABLE_SCHEMAS,
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
- QueryEnvironmentTestBase.SERVER2_SEGMENTS);
+ QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
}
@AfterClass
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index affc23f826..7c432046b7 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -95,7 +95,7 @@ public class QueryServerTest extends QueryTestSet {
// reducer port doesn't matter, we are testing the worker instance not
GRPC.
_queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1,
portList.get(0), portList.get(1),
QueryEnvironmentTestBase.TABLE_SCHEMAS,
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
- QueryEnvironmentTestBase.SERVER2_SEGMENTS);
+ QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
}
@AfterClass
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index 1b27d24c53..fd3ef7a374 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoa
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -44,7 +45,7 @@ import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -52,11 +53,18 @@ import static org.mockito.Mockito.when;
public class MockInstanceDataManagerFactory {
private static final String DATA_DIR_PREFIX = "MockInstanceDataDir";
- private final Map<String, List<GenericRow>> _tableRowsMap;
+ // Key is table name with type
private final Map<String, List<ImmutableSegment>> _tableSegmentMap;
private final Map<String, List<String>> _tableSegmentNameMap;
private final Map<String, File> _serverTableDataDirMap;
+
+ // Key is raw table name
+ private final Map<String, List<GenericRow>> _tableRowsMap;
private final Map<String, Schema> _schemaMap;
+
+ // Key is registered table (with or without type)
+ private final Map<String, Schema> _registeredSchemaMap;
+
private String _serverName;
public MockInstanceDataManagerFactory(String serverName) {
@@ -66,20 +74,29 @@ public class MockInstanceDataManagerFactory {
_tableSegmentNameMap = new HashMap<>();
_tableRowsMap = new HashMap<>();
_schemaMap = new HashMap<>();
+ _registeredSchemaMap = new HashMap<>();
}
- public MockInstanceDataManagerFactory registerTable(Schema schema, String
tableName) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType == null) {
- registerTableNameWithType(schema,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
- registerTableNameWithType(schema,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
- } else {
+ public void registerTable(Schema schema, String tableName) {
+ _registeredSchemaMap.put(tableName, schema);
+ if (TableNameBuilder.isTableResource(tableName)) {
+ _schemaMap.put(TableNameBuilder.extractRawTableName(tableName), schema);
registerTableNameWithType(schema, tableName);
+ } else {
+ _schemaMap.put(tableName, schema);
+ registerTableNameWithType(schema,
TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+ registerTableNameWithType(schema,
TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
- return this;
}
- public MockInstanceDataManagerFactory addSegment(String tableNameWithType,
List<GenericRow> rows) {
+ private void registerTableNameWithType(Schema schema, String
tableNameWithType) {
+ File tableDataDir = new File(FileUtils.getTempDirectory(),
+ String.format("%s_%s_%s", DATA_DIR_PREFIX, _serverName,
tableNameWithType));
+ FileUtils.deleteQuietly(tableDataDir);
+ _serverTableDataDirMap.put(tableNameWithType, tableDataDir);
+ }
+
+ public String addSegment(String tableNameWithType, List<GenericRow> rows) {
String segmentName = String.format("%s_%s", tableNameWithType,
UUID.randomUUID());
File tableDataDir = _serverTableDataDirMap.get(tableNameWithType);
ImmutableSegment segment = buildSegment(tableNameWithType, tableDataDir,
segmentName, rows);
@@ -96,7 +113,8 @@ public class MockInstanceDataManagerFactory {
List<GenericRow> tableRows = _tableRowsMap.getOrDefault(rawTableName, new
ArrayList<>());
tableRows.addAll(rows);
_tableRowsMap.put(rawTableName, tableRows);
- return this;
+
+ return segmentName;
}
public InstanceDataManager buildInstanceDataManager() {
@@ -107,11 +125,15 @@ public class MockInstanceDataManagerFactory {
tableDataManagers.put(e.getKey(), tableDataManager);
}
for (Map.Entry<String, TableDataManager> e : tableDataManagers.entrySet())
{
- when(instanceDataManager.getTableDataManager(e.getKey())).thenAnswer(inv
-> e.getValue());
+
when(instanceDataManager.getTableDataManager(e.getKey())).thenReturn(e.getValue());
}
return instanceDataManager;
}
+ public Map<String, Schema> getRegisteredSchemaMap() {
+ return _registeredSchemaMap;
+ }
+
public Map<String, Schema> buildSchemaMap() {
return _schemaMap;
}
@@ -125,10 +147,13 @@ public class MockInstanceDataManagerFactory {
}
private TableDataManager mockTableDataManager(List<ImmutableSegment>
segmentList) {
- List<SegmentDataManager> tableSegmentDataManagers =
-
segmentList.stream().map(ImmutableSegmentDataManager::new).collect(Collectors.toList());
+ Map<String, SegmentDataManager> segmentDataManagerMap =
+
segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName,
ImmutableSegmentDataManager::new));
TableDataManager tableDataManager = mock(TableDataManager.class);
- when(tableDataManager.acquireSegments(any(),
any())).thenReturn(tableSegmentDataManagers);
+ when(tableDataManager.acquireSegments(anyList(),
anyList())).thenAnswer(invocation -> {
+ List<String> segments = invocation.getArgument(0);
+ return
segments.stream().map(segmentDataManagerMap::get).collect(Collectors.toList());
+ });
return tableDataManager;
}
@@ -137,9 +162,9 @@ public class MockInstanceDataManagerFactory {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
// TODO: plugin table config constructor
- TableConfig tableConfig = new
TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts")
- .build();
- Schema schema = _schemaMap.get(tableNameWithType);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts").build();
+ Schema schema = _schemaMap.get(rawTableName);
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
config.setOutDir(indexDir.getPath());
config.setTableName(tableNameWithType);
@@ -154,13 +179,4 @@ public class MockInstanceDataManagerFactory {
throw new RuntimeException("Unable to construct immutable segment from
records", e);
}
}
-
- private void registerTableNameWithType(Schema schema, String
tableNameWithType) {
- File tableDataDir = new File(FileUtils.getTempDirectory(),
- String.format("%s_%s_%s", DATA_DIR_PREFIX, _serverName,
tableNameWithType));
- FileUtils.deleteQuietly(tableDataDir);
- _serverTableDataDirMap.put(tableNameWithType, tableDataDir);
- _schemaMap.put(TableNameBuilder.extractRawTableName(tableNameWithType),
schema);
- _schemaMap.put(tableNameWithType, schema);
- }
}
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index 9f14015514..4719c77202 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -654,13 +654,13 @@
"description": "nested aggregation",
"sql": "SELECT min(max(int_col)) FROM {tbl}",
"comments": ".*Aggregate expressions cannot be nested.",
- "expectedException": ".*Error composing query plan for.*"
+ "expectedException": "Error composing query plan for.*"
},
{
"psql": "4.2.7",
"description": "nested aggregation",
"sql": "SELECT (SELECT max(min(int_col)) FROM {tbl}) from {tbl};",
- "expectedException": ".*Error composing query plan for.*"
+ "expectedException": "Error composing query plan for.*"
},
{
"psql": "4.2.7",
diff --git a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
index 9c5cea93f3..973d83b70c 100644
--- a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
+++ b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
@@ -117,14 +117,14 @@
"outputs": [["b", 6], ["a", 6]]
},
{
- "comments": "table aren't actually partitioned by val thus all
segments can produce duplicate results, thus [[6]]",
+ "comments": "table aren't actually partitioned by val thus all
segments can produce duplicate results, thus [[8]]",
"sql": "SELECT SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1}",
- "outputs": [[6]]
+ "outputs": [[8]]
},
{
- "comments": "table aren't actually partitioned by val thus all
segments can produce duplicate results, thus [[b, 4], [a, 4]]",
+ "comments": "table aren't actually partitioned by val thus all
segments can produce duplicate results, thus [[b, 5], [a, 4]]",
"sql": "SELECT groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val)
FROM {tbl1} GROUP BY groupingCol",
- "outputs": [["b", 4], ["a", 4]]
+ "outputs": [["b", 5], ["a", 4]]
},
{
"sql": "SELECT l.groupingCol,
SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val),
SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON
l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
@@ -135,9 +135,9 @@
"outputs": [["b", 6], ["a", 6]]
},
{
- "comments": "table aren't actually partitioned by val thus all
segments can produce duplicate results, thus [[b, 4], [a, 4]]",
+ "comments": "table aren't actually partitioned by val thus all
segments can produce duplicate results, thus [[b, 5], [a, 4]]",
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */
groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY
groupingCol",
- "outputs": [["b", 4], ["a", 4]]
+ "outputs": [["b", 5], ["a", 4]]
},
{
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */
l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val),
SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON
l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index e9721994d9..940b318f1a 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -51,37 +51,55 @@
}
},
"queries": [
+ {
+ "description": "Wrong partition key",
+ "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+
tableOptions(partition_key='name', partition_size='4') */ GROUP BY {tbl1}.num",
+ "expectedException": "Error composing query plan for.*"
+ },
+ {
+ "description": "Wrong partition size",
+ "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='2') */ GROUP BY {tbl1}.num",
+ "expectedException": "Error composing query plan for.*"
+ },
+ {
+ "description": "Group by partition column",
+ "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
+ },
{
"description": "Colocated JOIN with partition column",
- "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */
{tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON
{tbl1}.num = {tbl2}.num"
+ "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2}
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num"
},
{
"description": "Colocated JOIN with partition column and group by
partition column",
- "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'),
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name,
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY
{tbl1}.num, {tbl1}.name"
+ "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true')
*/ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
},
{
"description": "Colocated JOIN with partition column and group by
non-partitioned column",
- "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'),
aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name,
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY
{tbl1}.name"
+ "sql": "SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num =
{tbl2}.num GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "Colocated JOIN with partition column and group by
non-partitioned column with stage parallelism",
+ "sql": "SET stageParallelism=2; SELECT {tbl1}.name, SUM({tbl2}.num)
FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN
{tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON
{tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx',
'yyy'))"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy'))"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column and group by partition column",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true'),
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num,
COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM
{tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'),
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num,
COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition
column and group by non-partitioned column",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx',
'yyy')) GROUP BY {tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN
('xxx', 'yyy')) GROUP BY {tbl1}.name"
},
{
"description": "Dynamic broadcast SEMI-JOIN with empty right table
result",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM
{tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val =
'non-exist') GROUP BY {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partially
empty right table result for some servers",
- "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY
{tbl1}.name"
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num',
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val =
'z') GROUP BY {tbl1}.name"
},
{
"description": "Skip leaf stage aggregation with GROUP BY hint",
diff --git a/pinot-query-runtime/src/test/resources/queries/SelectHaving.json
b/pinot-query-runtime/src/test/resources/queries/SelectHaving.json
index 9aad9542ca..1041bc7d4f 100644
--- a/pinot-query-runtime/src/test/resources/queries/SelectHaving.json
+++ b/pinot-query-runtime/src/test/resources/queries/SelectHaving.json
@@ -44,12 +44,12 @@
{
"comment": "Plan failed. Expression 'a' is not being grouped.",
"sql":"SELECT a FROM {test_having} HAVING min(a) < max(a);",
- "expectedException": ".*Error composing query plan.*"
+ "expectedException": "Error composing query plan for.*"
},
{
"comment": "Plan failed. Expression 'a' is not being grouped.",
"sql":"SELECT 1 AS one FROM {test_having} HAVING a > 1;",
- "expectedException": ".*Error composing query plan.*"
+ "expectedException": "Error composing query plan for.*"
},
{
"sql":"SELECT 1 AS one FROM {test_having} HAVING 1 > 2;"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]