This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5416652ff5 [multistage] [debuggability] OpChain and operator stats
(#10094)
5416652ff5 is described below
commit 5416652ff59b150d455c1f4764d2e0bc561d8cf3
Author: Yao Liu <[email protected]>
AuthorDate: Thu Jan 19 08:54:43 2023 -0800
[multistage] [debuggability] OpChain and operator stats (#10094)
---
.../apache/pinot/query/runtime/QueryRunner.java | 62 +++++-----
.../runtime/executor/OpChainSchedulerService.java | 10 +-
.../query/runtime/operator/AggregateOperator.java | 86 +++++++------
.../query/runtime/operator/FilterOperator.java | 22 +++-
.../query/runtime/operator/HashJoinOperator.java | 30 ++++-
.../LeafStageTransferableBlockOperator.java | 50 +++++---
.../runtime/operator/LiteralValueOperator.java | 27 +++-
.../runtime/operator/MailboxReceiveOperator.java | 99 ++++++++-------
.../runtime/operator/MailboxSendOperator.java | 21 +++-
.../pinot/query/runtime/operator/OpChain.java | 1 +
.../pinot/query/runtime/operator/OpChainStats.java | 20 +--
.../query/runtime/operator/OperatorStats.java | 78 ++++++++++++
.../pinot/query/runtime/operator/SortOperator.java | 26 +++-
.../query/runtime/operator/TransformOperator.java | 23 +++-
.../query/runtime/plan/PhysicalPlanVisitor.java | 15 ++-
.../pinot/query/service/QueryDispatcher.java | 12 +-
.../runtime/operator/AggregateOperatorTest.java | 41 +++----
.../query/runtime/operator/FilterOperatorTest.java | 28 ++---
.../runtime/operator/HashJoinOperatorTest.java | 35 +++---
.../LeafStageTransferableBlockOperatorTest.java | 136 +++++++++++----------
.../runtime/operator/LiteralValueOperatorTest.java | 25 +++-
.../runtime/operator/MailboxSendOperatorTest.java | 10 +-
.../query/runtime/operator/SortOperatorTest.java | 30 ++---
.../runtime/operator/TransformOperatorTest.java | 20 +--
24 files changed, 585 insertions(+), 322 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 41571fcd76..ad0d138bb4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -88,24 +88,20 @@ public class QueryRunner {
* Initializes the query executor.
* <p>Should be called only once and before calling any other method.
*/
- public void init(PinotConfiguration config, InstanceDataManager
instanceDataManager,
- HelixManager helixManager, ServerMetrics serverMetrics) {
+ public void init(PinotConfiguration config, InstanceDataManager
instanceDataManager, HelixManager helixManager,
+ ServerMetrics serverMetrics) {
String instanceName =
config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
_hostname =
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ?
instanceName.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT,
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
_helixManager = helixManager;
try {
- long releaseMs = config.getProperty(
- QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
+ long releaseMs =
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
- _scheduler = new OpChainSchedulerService(
- new RoundRobinScheduler(releaseMs),
- Executors.newFixedThreadPool(
- ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
- new NamedThreadFactory("query_worker_on_" + _port + "_port")),
- releaseMs);
+ _scheduler = new OpChainSchedulerService(new
RoundRobinScheduler(releaseMs),
+
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+ new NamedThreadFactory("query_worker_on_" + _port + "_port")),
releaseMs);
_mailboxService = MultiplexingMailboxService.newInstance(_hostname,
_port, config, _scheduler::onDataAvailable);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX),
instanceDataManager, serverMetrics);
@@ -130,12 +126,14 @@ public class QueryRunner {
}
public void processQuery(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap) {
+ long requestId =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
if (isLeafStage(distributedStagePlan)) {
// TODO: make server query request return via mailbox, this is a hack to
gather the non-streaming data table
// and package it here for return. But we should really use a
MailboxSendOperator directly put into the
// server executor.
- List<ServerPlanRequestContext> serverQueryRequests =
constructServerQueryRequests(distributedStagePlan,
- requestMetadataMap, _helixPropertyStore, _mailboxService);
+ long leafStageStartMillis = System.currentTimeMillis();
+ List<ServerPlanRequestContext> serverQueryRequests =
+ constructServerQueryRequests(distributedStagePlan,
requestMetadataMap, _helixPropertyStore, _mailboxService);
// send the data table via mailbox in one-off fashion (e.g. no
block-level split, one data table/partition key)
List<InstanceResponseBlock> serverQueryResults = new
ArrayList<>(serverQueryRequests.size());
@@ -144,25 +142,27 @@ public class QueryRunner {
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
System.currentTimeMillis());
serverQueryResults.add(processServerQuery(request,
_scheduler.getWorkerPool()));
}
-
+ LOGGER.debug(
+ "RequestId:" + requestId + " StageId:" +
distributedStagePlan.getStageId() + " Leaf stage v1 processing time:"
+ + (System.currentTimeMillis() - leafStageStartMillis) + " ms");
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
StageMetadata receivingStageMetadata =
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
- MailboxSendOperator mailboxSendOperator =
- new MailboxSendOperator(_mailboxService,
- new LeafStageTransferableBlockOperator(serverQueryResults,
sendNode.getDataSchema()),
- receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(),
- sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequests.get(0).getRequestId(),
- sendNode.getStageId());
+ MailboxSendOperator mailboxSendOperator = new
MailboxSendOperator(_mailboxService,
+ new LeafStageTransferableBlockOperator(serverQueryResults,
sendNode.getDataSchema(), requestId,
+ sendNode.getStageId()),
receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
+ sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequests.get(0).getRequestId(),
+ sendNode.getStageId());
int blockCounter = 0;
while
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
LOGGER.debug("Acquired transferable block: {}", blockCounter++);
}
+ mailboxSendOperator.toExplainString();
} else {
- long requestId =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
long timeoutMs =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
StageNode stageRoot = distributedStagePlan.getStageRoot();
- OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, new
PlanRequestContext(_mailboxService, requestId,
- stageRoot.getStageId(), timeoutMs, _hostname, _port,
distributedStagePlan.getMetadataMap()));
+ OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+ new PlanRequestContext(_mailboxService, requestId,
stageRoot.getStageId(), timeoutMs, _hostname, _port,
+ distributedStagePlan.getMetadataMap()));
_scheduler.register(rootOperator);
}
}
@@ -174,8 +174,8 @@ public class QueryRunner {
Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
"Server request for V2 engine should only have 1 scan table per
request.");
String rawTableName = stageMetadata.getScannedTables().get(0);
- Map<String, List<String>> tableToSegmentListMap =
stageMetadata.getServerInstanceToSegmentsMap()
- .get(distributedStagePlan.getServerInstance());
+ Map<String, List<String>> tableToSegmentListMap =
+
stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServerInstance());
List<ServerPlanRequestContext> requests = new ArrayList<>();
for (Map.Entry<String, List<String>> tableEntry :
tableToSegmentListMap.entrySet()) {
String tableType = tableEntry.getKey();
@@ -187,15 +187,17 @@ public class QueryRunner {
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- requests.add(ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap,
- tableConfig, schema, stageMetadata.getTimeBoundaryInfo(),
TableType.OFFLINE, tableEntry.getValue()));
+ requests.add(
+ ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap, tableConfig,
+ schema, stageMetadata.getTimeBoundaryInfo(),
TableType.OFFLINE, tableEntry.getValue()));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- requests.add(ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap,
- tableConfig, schema, stageMetadata.getTimeBoundaryInfo(),
TableType.REALTIME, tableEntry.getValue()));
+ requests.add(
+ ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap, tableConfig,
+ schema, stageMetadata.getTimeBoundaryInfo(),
TableType.REALTIME, tableEntry.getValue()));
} else {
throw new IllegalArgumentException("Unsupported table type key: " +
tableType);
}
@@ -209,8 +211,8 @@ public class QueryRunner {
return _serverExecutor.execute(serverQueryRequest, executorService);
} catch (Exception e) {
InstanceResponseBlock errorResponse = new InstanceResponseBlock();
-
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
- e.getMessage() + QueryException.getTruncatedStackTrace(e));
+ errorResponse.getExceptions()
+ .put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() +
QueryException.getTruncatedStackTrace(e));
return errorResponse;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 21a944d4f7..6c463eeeed 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -114,15 +114,18 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
register(operatorChain, false);
} else {
if (result.isErrorBlock()) {
+ operatorChain.getRoot().toExplainString();
LOGGER.error("({}): Completed erroneously {} {}",
operatorChain, operatorChain.getStats(),
result.getDataBlock().getExceptions());
} else {
+ operatorChain.getRoot().toExplainString();
LOGGER.debug("({}): Completed {}", operatorChain,
operatorChain.getStats());
}
operatorChain.close();
}
} catch (Exception e) {
operatorChain.close();
+ operatorChain.getRoot().toExplainString();
LOGGER.error("({}): Failed to execute operator chain! {}",
operatorChain, operatorChain.getStats(), e);
}
}
@@ -154,11 +157,6 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
LOGGER.debug("({}): Scheduler is now handling operator chain listening to
mailboxes {}. "
+ "There are a total of {} chains awaiting execution.",
operatorChain, operatorChain.getReceivingMailbox(),
_scheduler.size());
-
- // we want to track the time that it takes from registering
- // an operator chain to when it completes, so make sure to
- // start the timer here
- operatorChain.getStats().startExecutionTimer();
}
public final void register(OpChain operatorChain, boolean isNew) {
@@ -167,8 +165,8 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}",
operatorChain, isNew, _scheduler.size());
_scheduler.register(operatorChain, isNew);
- operatorChain.getStats().queued();
} finally {
+ operatorChain.getStats().queued();
_monitor.leave();
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 3182dba91f..199d8c8b96 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -37,6 +37,8 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -55,8 +57,10 @@ import org.apache.pinot.spi.data.FieldSpec;
*/
public class AggregateOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AggregateOperator.class);
private final MultiStageOperator _inputOperator;
+
// TODO: Deal with the case where _aggCalls is empty but we have groupSet
setup, which means this is a Distinct call.
private final List<RexExpression.FunctionCall> _aggCalls;
private final List<RexExpression> _groupSet;
@@ -69,27 +73,29 @@ public class AggregateOperator extends MultiStageOperator {
private boolean _readyToConstruct;
private boolean _hasReturnedAggregateBlock;
+ // TODO: Move to OperatorContext class.
+ private OperatorStats _operatorStats;
+
// TODO: refactor Pinot Reducer code to support the intermediate stage agg
operator.
// aggCalls has to be a list of FunctionCall and cannot be null
// groupSet has to be a list of InputRef and cannot be null
// TODO: Add these two checks when we confirm we can handle error in
upstream ctor call.
- public AggregateOperator(MultiStageOperator inputOperator, DataSchema
dataSchema,
- List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema
inputSchema) {
- this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema,
AggregateOperator.Accumulator.MERGERS);
+ public AggregateOperator(MultiStageOperator inputOperator, DataSchema
dataSchema, List<RexExpression> aggCalls,
+ List<RexExpression> groupSet, DataSchema inputSchema, long requestId,
int stageId) {
+ this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema,
AggregateOperator.Accumulator.MERGERS, requestId,
+ stageId);
}
@VisibleForTesting
- AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema,
- List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema
inputSchema, Map<String,
- Function<DataSchema.ColumnDataType, Merger>> mergers) {
+ AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema,
List<RexExpression> aggCalls,
+ List<RexExpression> groupSet, DataSchema inputSchema,
+ Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long
requestId, int stageId) {
_inputOperator = inputOperator;
_groupSet = groupSet;
_upstreamErrorBlock = null;
// we expect all agg calls to be aggregate function calls
- _aggCalls = aggCalls.stream()
- .map(RexExpression.FunctionCall.class::cast)
- .collect(Collectors.toList());
+ _aggCalls =
aggCalls.stream().map(RexExpression.FunctionCall.class::cast).collect(Collectors.toList());
_accumulators = new Accumulator[_aggCalls.size()];
for (int i = 0; i < _aggCalls.size(); i++) {
@@ -105,6 +111,7 @@ public class AggregateOperator extends MultiStageOperator {
_resultSchema = dataSchema;
_readyToConstruct = false;
_hasReturnedAggregateBlock = false;
+ _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -115,11 +122,15 @@ public class AggregateOperator extends MultiStageOperator
{
@Nullable
@Override
public String toExplainString() {
+ // TODO: move to close call;
+ _inputOperator.toExplainString();
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
+ _operatorStats.startTimer();
try {
if (!_readyToConstruct && !consumeInputBlocks()) {
return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -132,10 +143,13 @@ public class AggregateOperator extends MultiStageOperator
{
if (!_hasReturnedAggregateBlock) {
return produceAggregatedBlock();
} else {
+ // TODO: Move to close call.
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
+ } finally {
+ _operatorStats.endTimer();
}
}
@@ -154,6 +168,7 @@ public class AggregateOperator extends MultiStageOperator {
if (rows.size() == 0) {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
} else {
+ _operatorStats.recordOutput(1, rows.size());
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
}
@@ -162,7 +177,9 @@ public class AggregateOperator extends MultiStageOperator {
* @return whether or not the operator is ready to move on (EOS or ERROR)
*/
private boolean consumeInputBlocks() {
+ _operatorStats.endTimer();
TransferableBlock block = _inputOperator.nextBlock();
+ _operatorStats.startTimer();
while (!block.isNoOpBlock()) {
// setting upstream error block
if (block.isErrorBlock()) {
@@ -181,7 +198,10 @@ public class AggregateOperator extends MultiStageOperator {
_accumulators[i].accumulate(key, row);
}
}
+ _operatorStats.recordInput(1, container.size());
+ _operatorStats.endTimer();
block = _inputOperator.nextBlock();
+ _operatorStats.startTimer();
}
return false;
}
@@ -269,32 +289,25 @@ public class AggregateOperator extends MultiStageOperator
{
}
private static class Accumulator {
-
- private static final Map<String, Function<DataSchema.ColumnDataType,
Merger>> MERGERS = ImmutableMap
- .<String, Function<DataSchema.ColumnDataType, Merger>>builder()
- .put("SUM", cdt -> AggregateOperator::mergeSum)
- .put("$SUM", cdt -> AggregateOperator::mergeSum)
- .put("$SUM0", cdt -> AggregateOperator::mergeSum)
- .put("MIN", cdt -> AggregateOperator::mergeMin)
- .put("$MIN", cdt -> AggregateOperator::mergeMin)
- .put("$MIN0", cdt -> AggregateOperator::mergeMin)
- .put("MAX", cdt -> AggregateOperator::mergeMax)
- .put("$MAX", cdt -> AggregateOperator::mergeMax)
- .put("$MAX0", cdt -> AggregateOperator::mergeMax)
- .put("COUNT", cdt -> AggregateOperator::mergeCount)
- .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
- .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
- .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd)
- .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
- .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
- .put("$BOOL_OR0", cdt -> AggregateOperator::mergeBoolOr)
- .put("FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
- ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
- .put("$FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
- ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
- .put("$FOURTHMOMENT0", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
- ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
- .build();
+ private static final Map<String, Function<DataSchema.ColumnDataType,
Merger>> MERGERS =
+ ImmutableMap.<String, Function<DataSchema.ColumnDataType,
Merger>>builder()
+ .put("SUM", cdt -> AggregateOperator::mergeSum).put("$SUM", cdt ->
AggregateOperator::mergeSum)
+ .put("$SUM0", cdt -> AggregateOperator::mergeSum).put("MIN", cdt
-> AggregateOperator::mergeMin)
+ .put("$MIN", cdt -> AggregateOperator::mergeMin).put("$MIN0", cdt
-> AggregateOperator::mergeMin)
+ .put("MAX", cdt -> AggregateOperator::mergeMax).put("$MAX", cdt ->
AggregateOperator::mergeMax)
+ .put("$MAX0", cdt -> AggregateOperator::mergeMax).put("COUNT", cdt
-> AggregateOperator::mergeCount)
+ .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+ .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+ .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd)
+ .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+ .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+ .put("$BOOL_OR0", cdt ->
AggregateOperator::mergeBoolOr).put("FOURTHMOMENT",
+ cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new
MergeFourthMomentObject()
+ : new MergeFourthMomentNumeric()).put("$FOURTHMOMENT",
+ cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new
MergeFourthMomentObject()
+ : new MergeFourthMomentNumeric()).put("$FOURTHMOMENT0",
+ cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new
MergeFourthMomentObject()
+ : new MergeFourthMomentNumeric()).build();
final int _inputRef;
final Object _literal;
@@ -336,8 +349,7 @@ public class AggregateOperator extends MultiStageOperator {
private RexExpression
toAggregationFunctionOperand(RexExpression.FunctionCall rexExpression) {
List<RexExpression> functionOperands =
rexExpression.getFunctionOperands();
Preconditions.checkState(functionOperands.size() < 2, "aggregate
functions cannot have more than one operand");
- return functionOperands.size() > 0
- ? functionOperands.get(0)
+ return functionOperands.size() > 0 ? functionOperands.get(0)
: new RexExpression.Literal(FieldSpec.DataType.INT, 1);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index 3ae9eac98f..6f57ece6df 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -29,6 +29,8 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/*
@@ -47,15 +49,21 @@ import
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
public class FilterOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "FILTER";
private final MultiStageOperator _upstreamOperator;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AggregateOperator.class);
private final TransformOperand _filterOperand;
private final DataSchema _dataSchema;
private TransferableBlock _upstreamErrorBlock;
- public FilterOperator(MultiStageOperator upstreamOperator, DataSchema
dataSchema, RexExpression filter) {
+ // TODO: Move to OperatorContext class.
+ private OperatorStats _operatorStats;
+
+ public FilterOperator(MultiStageOperator upstreamOperator, DataSchema
dataSchema, RexExpression filter,
+ long requestId, int stageId) {
_upstreamOperator = upstreamOperator;
_dataSchema = dataSchema;
_filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
_upstreamErrorBlock = null;
+ _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -66,15 +74,23 @@ public class FilterOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ _upstreamOperator.toExplainString();
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
+ _operatorStats.startTimer();
try {
- return transform(_upstreamOperator.nextBlock());
+ _operatorStats.endTimer();
+ TransferableBlock block = _upstreamOperator.nextBlock();
+ _operatorStats.startTimer();
+ return transform(block);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
+ } finally {
+ _operatorStats.endTimer();
}
}
@@ -97,6 +113,8 @@ public class FilterOperator extends MultiStageOperator {
resultRows.add(row);
}
}
+ _operatorStats.recordInput(1, container.size());
+ _operatorStats.recordOutput(1, resultRows.size());
return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index be47c9f389..b4e88965cb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -39,6 +39,9 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This basic {@code BroadcastJoinOperator} implement a basic broadcast join
algorithm.
@@ -55,6 +58,8 @@ import
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
// TODO: Move inequi out of hashjoin.
(https://github.com/apache/pinot/issues/9728)
public class HashJoinOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "HASH_JOIN";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AggregateOperator.class);
+
private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES =
ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT,
JoinRelType.FULL);
@@ -82,8 +87,10 @@ public class HashJoinOperator extends MultiStageOperator {
private KeySelector<Object[], Object[]> _leftKeySelector;
private KeySelector<Object[], Object[]> _rightKeySelector;
+ private OperatorStats _operatorStats;
+
public HashJoinOperator(MultiStageOperator leftTableOperator,
MultiStageOperator rightTableOperator,
- DataSchema leftSchema, JoinNode node) {
+ DataSchema leftSchema, JoinNode node, long requestId, int stageId) {
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
"Join type: " + node.getJoinRelType() + " is not supported!");
_joinType = node.getJoinRelType();
@@ -111,6 +118,7 @@ public class HashJoinOperator extends MultiStageOperator {
_matchedRightRows = null;
}
_upstreamErrorBlock = null;
+ _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
// TODO: Separate left and right table operator.
@@ -122,11 +130,15 @@ public class HashJoinOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ _leftTableOperator.toExplainString();
+ _rightTableOperator.toExplainString();
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
+ _operatorStats.startTimer();
try {
if (_isTerminated) {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -140,15 +152,22 @@ public class HashJoinOperator extends MultiStageOperator {
} else if (!_isHashTableBuilt) {
return TransferableBlockUtils.getNoOpTransferableBlock();
}
+ _operatorStats.endTimer();
+ TransferableBlock leftBlock = _leftTableOperator.nextBlock();
+ _operatorStats.startTimer();
// JOIN each left block with the right block.
- return buildJoinedDataBlock(_leftTableOperator.nextBlock());
+ return buildJoinedDataBlock(leftBlock);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
+ } finally {
+ _operatorStats.endTimer();
}
}
private void buildBroadcastHashTable() {
+ _operatorStats.endTimer();
TransferableBlock rightBlock = _rightTableOperator.nextBlock();
+ _operatorStats.startTimer();
while (!rightBlock.isNoOpBlock()) {
if (rightBlock.isErrorBlock()) {
_upstreamErrorBlock = rightBlock;
@@ -165,8 +184,10 @@ public class HashJoinOperator extends MultiStageOperator {
_broadcastRightTable.computeIfAbsent(new
Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
hashCollection.add(row);
}
-
+ _operatorStats.recordInput(1, container.size());
+ _operatorStats.endTimer();
rightBlock = _rightTableOperator.nextBlock();
+ _operatorStats.startTimer();
}
}
@@ -196,6 +217,7 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
_isTerminated = true;
+ _operatorStats.recordOutput(1, returnRows.size());
return new TransferableBlock(returnRows, _resultSchema,
DataBlock.Type.ROW);
}
List<Object[]> rows = new ArrayList<>();
@@ -230,6 +252,8 @@ public class HashJoinOperator extends MultiStageOperator {
rows.add(joinRow(leftRow, null));
}
}
+ _operatorStats.recordInput(1, container.size());
+ _operatorStats.recordOutput(1, rows.size());
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index e794a84194..baf7373a07 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -40,6 +40,8 @@ import
org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -57,17 +59,23 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
*/
public class LeafStageTransferableBlockOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LeafStageTransferableBlockOperator.class);
private final InstanceResponseBlock _errorBlock;
private final List<InstanceResponseBlock> _baseResultBlock;
private final DataSchema _desiredDataSchema;
private int _currentIndex;
- public LeafStageTransferableBlockOperator(List<InstanceResponseBlock>
baseResultBlock, DataSchema dataSchema) {
+ // TODO: Move to OperatorContext class.
+ private OperatorStats _operatorStats;
+
+ public LeafStageTransferableBlockOperator(List<InstanceResponseBlock>
baseResultBlock, DataSchema dataSchema,
+ long requestId, int stageId) {
_baseResultBlock = baseResultBlock;
_desiredDataSchema = dataSchema;
_errorBlock = baseResultBlock.stream().filter(e ->
!e.getExceptions().isEmpty()).findFirst().orElse(null);
_currentIndex = 0;
+ _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -78,29 +86,39 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- if (_currentIndex < 0) {
- throw new RuntimeException("Leaf transfer terminated. next block should
no longer be called.");
- }
- if (_errorBlock != null) {
- _currentIndex = -1;
- return new
TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
- } else {
- if (_currentIndex < _baseResultBlock.size()) {
- InstanceResponseBlock responseBlock =
_baseResultBlock.get(_currentIndex++);
- if (responseBlock.getResultsBlock() != null &&
responseBlock.getResultsBlock().getNumRows() > 0) {
- return composeTransferableBlock(responseBlock, _desiredDataSchema);
+ try {
+ _operatorStats.startTimer();
+ if (_currentIndex < 0) {
+ throw new RuntimeException("Leaf transfer terminated. next block
should no longer be called.");
+ }
+ if (_errorBlock != null) {
+ _currentIndex = -1;
+ return new
TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
+ } else {
+ if (_currentIndex < _baseResultBlock.size()) {
+ InstanceResponseBlock responseBlock =
_baseResultBlock.get(_currentIndex++);
+ if (responseBlock.getResultsBlock() != null &&
responseBlock.getResultsBlock().getNumRows() > 0) {
+ _operatorStats.recordInput(1,
responseBlock.getResultsBlock().getNumRows());
+ _operatorStats.recordOutput(1,
responseBlock.getResultsBlock().getNumRows());
+ return composeTransferableBlock(responseBlock, _desiredDataSchema);
+ } else {
+ _operatorStats.recordInput(1,
responseBlock.getResultsBlock().getNumRows());
+ _operatorStats.recordOutput(1,
responseBlock.getResultsBlock().getNumRows());
+ return new TransferableBlock(Collections.emptyList(),
_desiredDataSchema, DataBlock.Type.ROW);
+ }
} else {
- return new TransferableBlock(Collections.emptyList(),
_desiredDataSchema, DataBlock.Type.ROW);
+ _currentIndex = -1;
+ return new
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
}
- } else {
- _currentIndex = -1;
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
}
+ } finally {
+ _operatorStats.endTimer();
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index cace9fa974..8fc160f205 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -27,17 +27,24 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LiteralValueOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LiteralValueOperator.class);
private final DataSchema _dataSchema;
private final TransferableBlock _rexLiteralBlock;
private boolean _isLiteralBlockReturned;
- public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>>
rexLiteralRows) {
+ private OperatorStats _operatorStats;
+
+ public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>>
rexLiteralRows,
+ long requestId, int stageId) {
_dataSchema = dataSchema;
+ _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
_rexLiteralBlock = constructBlock(rexLiteralRows);
_isLiteralBlockReturned = false;
}
@@ -50,16 +57,22 @@ public class LiteralValueOperator extends
MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- if (!_isLiteralBlockReturned) {
- _isLiteralBlockReturned = true;
- return _rexLiteralBlock;
- } else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ try {
+ _operatorStats.startTimer();
+ if (!_isLiteralBlockReturned) {
+ _isLiteralBlockReturned = true;
+ return _rexLiteralBlock;
+ } else {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ } finally {
+ _operatorStats.endTimer();
}
}
@@ -72,6 +85,8 @@ public class LiteralValueOperator extends MultiStageOperator {
}
blockContent.add(row);
}
+ _operatorStats.recordInput(1, blockContent.size());
+ _operatorStats.recordOutput(1, blockContent.size());
return new TransferableBlock(blockContent, _dataSchema,
DataBlock.Type.ROW);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ee97a99e79..aa569ba56f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -65,6 +65,7 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
private final long _deadlineTimestampNano;
private int _serverIdx;
private TransferableBlock _upstreamErrorBlock;
+ private OperatorStats _operatorStats;
private static MailboxIdentifier toMailboxId(ServerInstance fromInstance,
long jobId, long stageId,
String receiveHostName, int receivePort) {
@@ -109,6 +110,7 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
}
_upstreamErrorBlock = null;
_serverIdx = 0;
+ _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
}
public List<MailboxIdentifier> getSendingMailbox() {
@@ -123,61 +125,68 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- if (_upstreamErrorBlock != null) {
- return _upstreamErrorBlock;
- } else if (System.nanoTime() >= _deadlineTimestampNano) {
- LOGGER.error("Timed out after polling mailboxes: {}", _sendingMailbox);
- return
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
- }
+ try {
+ _operatorStats.startTimer();
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ } else if (System.nanoTime() >= _deadlineTimestampNano) {
+ return
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+ }
- int startingIdx = _serverIdx;
- int openMailboxCount = 0;
- int eosMailboxCount = 0;
-
- // For all non-singleton distribution, we poll from every instance to
check mailbox content.
- // TODO: Fix wasted CPU cycles on waiting for servers that are not
supposed to give content.
- for (int i = 0; i < _sendingMailbox.size(); i++) {
- // this implements a round-robin mailbox iterator, so we don't starve
any mailboxes
- _serverIdx = (startingIdx + i) % _sendingMailbox.size();
- MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
- try {
- ReceivingMailbox<TransferableBlock> mailbox =
_mailboxService.getReceivingMailbox(mailboxId);
- if (!mailbox.isClosed()) {
- openMailboxCount++;
- TransferableBlock block = mailbox.receive();
-
- // Get null block when pulling times out from mailbox.
- if (block != null) {
- if (block.isErrorBlock()) {
- _upstreamErrorBlock =
-
TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
- return _upstreamErrorBlock;
- }
- if (!block.isEndOfStreamBlock()) {
- return block;
- } else {
- eosMailboxCount++;
+ int startingIdx = _serverIdx;
+ int openMailboxCount = 0;
+ int eosMailboxCount = 0;
+
+ // For all non-singleton distribution, we poll from every instance to
check mailbox content.
+ // TODO: Fix wasted CPU cycles on waiting for servers that are not
supposed to give content.
+ for (int i = 0; i < _sendingMailbox.size(); i++) {
+ // this implements a round-robin mailbox iterator, so we don't starve
any mailboxes
+ _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+ MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
+ try {
+ ReceivingMailbox<TransferableBlock> mailbox =
_mailboxService.getReceivingMailbox(mailboxId);
+ if (!mailbox.isClosed()) {
+ openMailboxCount++;
+ TransferableBlock block = mailbox.receive();
+ // Get null block when pulling times out from mailbox.
+ if (block != null) {
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock =
+
TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
+ return _upstreamErrorBlock;
+ }
+ if (!block.isEndOfStreamBlock()) {
+ _operatorStats.recordInput(1, block.getNumRows());
+ _operatorStats.recordOutput(1, block.getNumRows());
+ return block;
+ } else {
+ eosMailboxCount++;
+ }
}
}
+ } catch (Exception e) {
+ return TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException(String.format("Error polling mailbox=%s",
mailboxId), e));
}
- } catch (Exception e) {
- return TransferableBlockUtils.getErrorTransferableBlock(
- new RuntimeException(String.format("Error polling mailbox=%s",
mailboxId), e));
}
- }
- // there are two conditions in which we should return EOS: (1) there were
- // no mailboxes to open (this shouldn't happen because the second condition
- // should be hit first, but is defensive) (2) every mailbox that was opened
- // returned an EOS block. in every other scenario, there are mailboxes that
- // are not yet exhausted and we should wait for more data to be available
- return openMailboxCount > 0 && openMailboxCount > eosMailboxCount
- ? TransferableBlockUtils.getNoOpTransferableBlock()
- : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ // there are two conditions in which we should return EOS: (1) there were
+ // no mailboxes to open (this shouldn't happen because the second
condition
+ // should be hit first, but is defensive) (2) every mailbox that was
opened
+ // returned an EOS block. in every other scenario, there are mailboxes
that
+ // are not yet exhausted and we should wait for more data to be available
+ TransferableBlock block =
+ openMailboxCount > 0 && openMailboxCount > eosMailboxCount ?
TransferableBlockUtils.getNoOpTransferableBlock()
+ : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ return block;
+ } finally {
+ _operatorStats.endTimer();
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index a6299ea60a..79d64bfefe 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -54,6 +54,7 @@ public class MailboxSendOperator extends MultiStageOperator {
private final MultiStageOperator _dataTableBlockBaseOperator;
private final BlockExchange _exchange;
+ private OperatorStats _operatorStats;
@VisibleForTesting
interface BlockExchangeFactory {
@@ -71,14 +72,14 @@ public class MailboxSendOperator extends MultiStageOperator
{
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector, String hostName, int port,
long jobId, int stageId) {
this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances,
exchangeType, keySelector,
- server -> toMailboxId(server, jobId, stageId, hostName, port),
BlockExchange::getExchange);
+ server -> toMailboxId(server, jobId, stageId, hostName, port),
BlockExchange::getExchange, jobId, stageId);
}
@VisibleForTesting
MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance>
receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector,
- MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory
blockExchangeFactory) {
+ MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory
blockExchangeFactory, long jobId, int stageId) {
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
List<MailboxIdentifier> receivingMailboxes;
@@ -106,6 +107,7 @@ public class MailboxSendOperator extends MultiStageOperator
{
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
String.format("Exchange type '%s' is not supported yet",
exchangeType));
+ _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
}
@Override
@@ -116,22 +118,30 @@ public class MailboxSendOperator extends
MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ _dataTableBlockBaseOperator.toExplainString();
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
+ _operatorStats.startTimer();
TransferableBlock transferableBlock;
try {
+ _operatorStats.endTimer();
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+ _operatorStats.startTimer();
while (!transferableBlock.isNoOpBlock()) {
_exchange.send(transferableBlock);
-
+ _operatorStats.recordInput(1, transferableBlock.getNumRows());
+ // The # of output block is not accurate because we may do a split in
exchange send.
+ _operatorStats.recordOutput(1, transferableBlock.getNumRows());
if (transferableBlock.isEndOfStreamBlock()) {
return transferableBlock;
}
-
+ _operatorStats.endTimer();
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+ _operatorStats.startTimer();
}
} catch (final Exception e) {
// ideally, MailboxSendOperator doesn't ever throw an exception because
@@ -143,8 +153,9 @@ public class MailboxSendOperator extends MultiStageOperator
{
} catch (Exception e2) {
LOGGER.error("Exception while sending block to mailbox.", e2);
}
+ } finally {
+ _operatorStats.endTimer();
}
-
return transferableBlock;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index 424d7d003f..ae6bae362f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -52,6 +52,7 @@ public class OpChain implements AutoCloseable {
return _receivingMailbox;
}
+ // TODO: Move OperatorStats here.
public OpChainStats getStats() {
return _stats;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index 58327c40da..07705f8bac 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -34,13 +34,14 @@ public class OpChainStats {
// use memoized supplier so that the timing doesn't start until the
// first time we get the timer
- private final Supplier<ThreadResourceUsageProvider> _exTimer
- = Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
+ private final Supplier<ThreadResourceUsageProvider> _exTimer =
+ Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
// this is used to make sure that toString() doesn't have side
// effects (accidentally starting the timer)
private volatile boolean _exTimerStarted = false;
+ private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted();
private final Stopwatch _queuedStopwatch = Stopwatch.createUnstarted();
private final AtomicLong _queuedCount = new AtomicLong();
@@ -62,20 +63,23 @@ public class OpChainStats {
if (!_queuedStopwatch.isRunning()) {
_queuedStopwatch.start();
}
+ if (_executeStopwatch.isRunning()) {
+ _executeStopwatch.stop();
+ }
}
public void startExecutionTimer() {
_exTimerStarted = true;
_exTimer.get();
+ if (!_executeStopwatch.isRunning()) {
+ _executeStopwatch.start();
+ }
}
@Override
public String toString() {
- return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued
Time: %sms",
- _id,
- _queuedCount.get(),
- _exTimerStarted ?
TimeUnit.NANOSECONDS.toMillis(_exTimer.get().getThreadTimeNs()) : 0,
- _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS)
- );
+ return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued
Time: %sms", _id, _queuedCount.get(),
+ _exTimerStarted ? _executeStopwatch.elapsed(TimeUnit.MILLISECONDS) : 0,
+ _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
new file mode 100644
index 0000000000..c40b96b3c8
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+
+
+public class OperatorStats {
+ private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted();
+
+ // TODO: add a operatorId for better tracking purpose.
+ private final int _stageId;
+ private final long _requestId;
+
+ private final String _operatorType;
+
+ private int _numInputBlock = 0;
+ private int _numInputRows = 0;
+
+ private int _numOutputBlock = 0;
+
+ private int _numOutputRows = 0;
+
+ public OperatorStats(long requestId, int stageId, String operatorType) {
+ _stageId = stageId;
+ _requestId = requestId;
+ _operatorType = operatorType;
+ }
+
+ public void startTimer() {
+ if (!_executeStopwatch.isRunning()) {
+ _executeStopwatch.start();
+ }
+ }
+
+ public void endTimer() {
+ if (_executeStopwatch.isRunning()) {
+ _executeStopwatch.stop();
+ }
+ }
+
+ public void recordInput(int numBlock, int numRows) {
+ _numInputBlock += numBlock;
+ _numInputRows += numRows;
+ }
+
+ public void recordOutput(int numBlock, int numRows) {
+ _numOutputBlock += numBlock;
+ _numOutputRows += numRows;
+ }
+
+ // TODO: Return the string as a JSON string.
+ @Override
+ public String toString() {
+ return String.format(
+ "OperatorStats[type: %s, requestId: %s, stageId %s] ExecutionWallTime:
%sms, InputRows: %s, InputBlock: "
+ + "%s, OutputRows: %s, OutputBlock: %s", _operatorType,
_requestId, _stageId,
+ _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numInputRows,
_numInputBlock, _numOutputRows,
+ _numOutputBlock);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 4ba3dbde94..13f4306e01 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -32,11 +32,15 @@ import
org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SortOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "SORT";
private final MultiStageOperator _upstreamOperator;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SortOperator.class);
+
private final int _fetch;
private final int _offset;
private final DataSchema _dataSchema;
@@ -46,17 +50,19 @@ public class SortOperator extends MultiStageOperator {
private boolean _readyToConstruct;
private boolean _isSortedBlockConstructed;
private TransferableBlock _upstreamErrorBlock;
+ private OperatorStats _operatorStats;
public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression>
collationKeys,
- List<RelFieldCollation.Direction> collationDirections, int fetch, int
offset, DataSchema dataSchema) {
+ List<RelFieldCollation.Direction> collationDirections, int fetch, int
offset, DataSchema dataSchema,
+ long requestId, int stageId) {
this(upstreamOperator, collationKeys, collationDirections, fetch, offset,
dataSchema,
- SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
+ SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId,
stageId);
}
@VisibleForTesting
SortOperator(MultiStageOperator upstreamOperator, List<RexExpression>
collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int
offset, DataSchema dataSchema,
- int maxHolderCapacity) {
+ int maxHolderCapacity, long requestId, int stageId) {
_upstreamOperator = upstreamOperator;
_fetch = fetch;
_offset = offset;
@@ -68,6 +74,7 @@ public class SortOperator extends MultiStageOperator {
: maxHolderCapacity;
_rows = new PriorityQueue<>(_numRowsToKeep,
new SortComparator(collationKeys, collationDirections, dataSchema,
false));
+ _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -78,21 +85,27 @@ public class SortOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ _upstreamOperator.toExplainString();
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
+ _operatorStats.startTimer();
try {
consumeInputBlocks();
return produceSortedBlock();
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
+ } finally {
+ _operatorStats.endTimer();
}
}
private TransferableBlock produceSortedBlock() {
if (_upstreamErrorBlock != null) {
+ LOGGER.error("OperatorStats:" + _operatorStats);
return _upstreamErrorBlock;
} else if (!_readyToConstruct) {
return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -104,6 +117,7 @@ public class SortOperator extends MultiStageOperator {
Object[] row = _rows.poll();
rows.addFirst(row);
}
+ _operatorStats.recordOutput(1, rows.size());
_isSortedBlockConstructed = true;
if (rows.size() == 0) {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -117,7 +131,9 @@ public class SortOperator extends MultiStageOperator {
private void consumeInputBlocks() {
if (!_isSortedBlockConstructed) {
+ _operatorStats.endTimer();
TransferableBlock block = _upstreamOperator.nextBlock();
+ _operatorStats.startTimer();
while (!block.isNoOpBlock()) {
// setting upstream error block
if (block.isErrorBlock()) {
@@ -132,8 +148,10 @@ public class SortOperator extends MultiStageOperator {
for (Object[] row : container) {
SelectionOperatorUtils.addToPriorityQueue(row, _rows,
_numRowsToKeep);
}
-
+ _operatorStats.endTimer();
block = _upstreamOperator.nextBlock();
+ _operatorStats.startTimer();
+ _operatorStats.recordInput(1, container.size());
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 83db10d4a2..a75ad6b017 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -30,6 +30,8 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -45,14 +47,16 @@ import
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
public class TransformOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "TRANSFORM";
private final MultiStageOperator _upstreamOperator;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TransformOperator.class);
private final List<TransformOperand> _transformOperandsList;
private final int _resultColumnSize;
// TODO: Check type matching between resultSchema and the actual result.
private final DataSchema _resultSchema;
private TransferableBlock _upstreamErrorBlock;
+ private OperatorStats _operatorStats;
public TransformOperator(MultiStageOperator upstreamOperator, DataSchema
resultSchema,
- List<RexExpression> transforms, DataSchema upstreamDataSchema) {
+ List<RexExpression> transforms, DataSchema upstreamDataSchema, long
requestId, int stageId) {
Preconditions.checkState(!transforms.isEmpty(), "transform operand should
not be empty.");
Preconditions.checkState(resultSchema.size() == transforms.size(),
"result schema size:" + resultSchema.size() + " doesn't match
transform operand size:" + transforms.size());
@@ -63,6 +67,7 @@ public class TransformOperator extends MultiStageOperator {
_transformOperandsList.add(TransformOperand.toTransformOperand(rexExpression,
upstreamDataSchema));
}
_resultSchema = resultSchema;
+ _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
}
@Override
@@ -73,15 +78,23 @@ public class TransformOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
+ _upstreamOperator.toExplainString();
+ LOGGER.debug(_operatorStats.toString());
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
+ _operatorStats.startTimer();
try {
- return transform(_upstreamOperator.nextBlock());
+ _operatorStats.endTimer();
+ TransferableBlock block = _upstreamOperator.nextBlock();
+ _operatorStats.startTimer();
+ return transform(block);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
+ } finally {
+ _operatorStats.endTimer();
}
}
@@ -103,11 +116,13 @@ public class TransformOperator extends MultiStageOperator
{
for (Object[] row : container) {
Object[] resultRow = new Object[_resultColumnSize];
for (int i = 0; i < _resultColumnSize; i++) {
- resultRow[i] =
FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row),
- _resultSchema.getColumnDataType(i));
+ resultRow[i] =
+
FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row),
_resultSchema.getColumnDataType(i));
}
resultRows.add(resultRow);
}
+ _operatorStats.recordInput(1, container.size());
+ _operatorStats.recordOutput(1, resultRows.size());
return new TransferableBlock(resultRows, _resultSchema,
DataBlock.Type.ROW);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index aaa140463f..d675efea62 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -84,13 +84,14 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
public MultiStageOperator visitAggregate(AggregateNode node,
PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
return new AggregateOperator(nextOperator, node.getDataSchema(),
node.getAggCalls(),
- node.getGroupSet(), node.getInputs().get(0).getDataSchema());
+ node.getGroupSet(), node.getInputs().get(0).getDataSchema(),
context._requestId, context._stageId);
}
@Override
public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext
context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
- return new FilterOperator(nextOperator, node.getDataSchema(),
node.getCondition());
+ return new FilterOperator(nextOperator, node.getDataSchema(),
node.getCondition(), context.getRequestId(),
+ context.getStageId());
}
@Override
@@ -101,21 +102,22 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
MultiStageOperator leftOperator = left.visit(this, context);
MultiStageOperator rightOperator = right.visit(this, context);
- return new HashJoinOperator(leftOperator, rightOperator,
left.getDataSchema(), node);
+ return new HashJoinOperator(leftOperator, rightOperator,
left.getDataSchema(), node, context.getRequestId(),
+ context.getStageId());
}
@Override
public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext
context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
return new TransformOperator(nextOperator, node.getDataSchema(),
node.getProjects(),
- node.getInputs().get(0).getDataSchema());
+ node.getInputs().get(0).getDataSchema(), context.getRequestId(),
context.getStageId());
}
@Override
public MultiStageOperator visitSort(SortNode node, PlanRequestContext
context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this,
context);
return new SortOperator(nextOperator, node.getCollationKeys(),
node.getCollationDirections(),
- node.getFetch(), node.getOffset(), node.getDataSchema());
+ node.getFetch(), node.getOffset(), node.getDataSchema(),
context.getRequestId(), context.getStageId());
}
@Override
@@ -125,6 +127,7 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitValue(ValueNode node, PlanRequestContext
context) {
- return new LiteralValueOperator(node.getDataSchema(),
node.getLiteralRows());
+ return new LiteralValueOperator(node.getDataSchema(),
node.getLiteralRows(), context.getRequestId(),
+ context.getStageId());
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 25b7aa5e8b..4cbd4aa624 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -46,12 +46,16 @@ import
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@code QueryDispatcher} dispatch a query to different workers.
*/
public class QueryDispatcher {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryDispatcher.class);
+
private final Map<String, DispatchClient> _dispatchClientMap = new
ConcurrentHashMap<>();
public QueryDispatcher() {
@@ -69,8 +73,14 @@ public class QueryDispatcher {
reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
mailboxService.getHostname(),
mailboxService.getMailboxPort(), timeoutMs);
List<DataBlock> resultDataBlocks =
reduceMailboxReceive(mailboxReceiveOperator, timeoutMs);
- return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
+ mailboxReceiveOperator.toExplainString();
+ long toResultTableStartTime = System.currentTimeMillis();
+ ResultTable resultTable = toResultTable(resultDataBlocks,
queryPlan.getQueryResultFields(),
queryPlan.getQueryStageMap().get(0).getDataSchema());
+ LOGGER.debug(
+ "RequestId:" + requestId + " StageId: 0 Broker toResultTable
processing time:" + (System.currentTimeMillis()
+ - toResultTableStartTime) + " ms");
+ return resultTable;
}
public int submit(long requestId, QueryPlan queryPlan, long timeoutMs)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index aa7394a6a8..6f2212e6b6 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -71,7 +71,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, 1, 2);
// When:
TransferableBlock block1 = operator.nextBlock(); // build
@@ -87,12 +87,11 @@ public class AggregateOperatorTest {
List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(1)));
List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
- Mockito.when(_input.nextBlock())
- .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, 1, 2);
// When:
TransferableBlock block = operator.nextBlock();
@@ -109,13 +108,12 @@ public class AggregateOperatorTest {
List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
- Mockito.when(_input.nextBlock())
- .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}))
+
Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema,
new Object[]{1, 1}))
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, 1, 2);
// When:
TransferableBlock block1 = operator.nextBlock(); // build when reading
NoOp block
@@ -134,12 +132,11 @@ public class AggregateOperatorTest {
List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
- Mockito.when(_input.nextBlock())
- .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}))
+
Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema,
new Object[]{2, 1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, 1, 2);
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -160,12 +157,11 @@ public class AggregateOperatorTest {
List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
- Mockito.when(_input.nextBlock())
- .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3}))
+
Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema,
new Object[]{2, 3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, 1, 2);
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -196,9 +192,8 @@ public class AggregateOperatorTest {
Mockito.when(merger.merge(Mockito.any(), Mockito.any())).thenReturn(12d);
Mockito.when(merger.initialize(Mockito.any())).thenReturn(1d);
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, ImmutableMap.of(
- "SUM", cdt -> merger
- ));
+ AggregateOperator operator =
+ new AggregateOperator(_input, outSchema, calls, group, inSchema,
ImmutableMap.of("SUM", cdt -> merger), 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock(); // (output result)
@@ -220,7 +215,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
AggregateOperator sum0GroupBy1 =
new AggregateOperator(upstreamOperator,
OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
- Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)),
inSchema);
+ Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)),
inSchema, 1, 2);
TransferableBlock result = sum0GroupBy1.getNextBlock();
while (result.isNoOpBlock()) {
result = sum0GroupBy1.getNextBlock();
@@ -232,20 +227,18 @@ public class AggregateOperatorTest {
Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
}
- @Test(
- expectedExceptions = IllegalStateException.class,
- expectedExceptionsMessageRegExp = ".*Unexpected value: AVERAGE.*")
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*Unexpected value: "
+ + "AVERAGE.*")
public void shouldThrowOnUnknownAggFunction() {
// Given:
List<RexExpression> calls = ImmutableList.of(
- new RexExpression.FunctionCall(SqlKind.AVG, FieldSpec.DataType.INT,
"AVERAGE", ImmutableList.of())
- );
+ new RexExpression.FunctionCall(SqlKind.AVG, FieldSpec.DataType.INT,
"AVERAGE", ImmutableList.of()));
List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
DataSchema outSchema = new DataSchema(new String[]{"unknown"}, new
ColumnDataType[]{DOUBLE});
DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new
ColumnDataType[]{DOUBLE});
// When:
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, 1, 2);
}
@Test
@@ -262,7 +255,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group, inSchema, 1, 2);
// When:
TransferableBlock block = operator.nextBlock();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index c53ea2037c..cf5cb80151 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -61,7 +61,7 @@ public class FilterOperatorTest {
DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new
DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.BOOLEAN
});
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral, 1, 2);
TransferableBlock errorBlock = op.getNextBlock();
Assert.assertTrue(errorBlock.isErrorBlock());
DataBlock error = errorBlock.getDataBlock();
@@ -76,7 +76,7 @@ public class FilterOperatorTest {
DataSchema.ColumnDataType.INT
});
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertTrue(dataBlock.isEndOfStreamBlock());
}
@@ -89,7 +89,7 @@ public class FilterOperatorTest {
DataSchema.ColumnDataType.INT
});
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertTrue(dataBlock.isNoOpBlock());
}
@@ -104,7 +104,7 @@ public class FilterOperatorTest {
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new
Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -122,7 +122,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -137,7 +137,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral, 1, 2);
TransferableBlock errorBlock = op.getNextBlock();
Assert.assertTrue(errorBlock.isErrorBlock());
DataBlock data = errorBlock.getDataBlock();
@@ -152,7 +152,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref0);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref0, 1, 2);
TransferableBlock errorBlock = op.getNextBlock();
Assert.assertTrue(errorBlock.isErrorBlock());
DataBlock data = errorBlock.getDataBlock();
@@ -167,7 +167,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true},
new Object[]{2, false}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref1);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref1, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -187,7 +187,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall andCall = new
RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND",
ImmutableList.of(new RexExpression.InputRef(0), new
RexExpression.InputRef(1)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
andCall);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
andCall, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -207,7 +207,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall orCall = new
RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR",
ImmutableList.of(new RexExpression.InputRef(0), new
RexExpression.InputRef(1)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
orCall);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
orCall, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -229,7 +229,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall notCall = new
RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT",
ImmutableList.of(new RexExpression.InputRef(0)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
notCall);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
notCall, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -248,7 +248,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall greaterThan =
new RexExpression.FunctionCall(SqlKind.GREATER_THAN,
FieldSpec.DataType.BOOLEAN, "greaterThan",
ImmutableList.of(new RexExpression.InputRef(0), new
RexExpression.InputRef(1)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
greaterThan);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
greaterThan, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -268,7 +268,7 @@ public class FilterOperatorTest {
new RexExpression.FunctionCall(SqlKind.OTHER,
FieldSpec.DataType.BOOLEAN, "startsWith",
ImmutableList.of(new RexExpression.InputRef(0),
new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
startsWith);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
startsWith, 1, 2);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -289,6 +289,6 @@ public class FilterOperatorTest {
new RexExpression.FunctionCall(SqlKind.OTHER,
FieldSpec.DataType.BOOLEAN, "startsWithError",
ImmutableList.of(new RexExpression.InputRef(0),
new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
startsWith);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
startsWith, 1, 2);
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index b9ea6b2078..4075237249 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -90,7 +90,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = joinOnString.nextBlock();
while (result.isNoOpBlock()) {
@@ -127,7 +127,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = joinOnInt.nextBlock();
while (result.isNoOpBlock()) {
result = joinOnInt.nextBlock();
@@ -161,7 +161,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(new ArrayList<>(), new ArrayList<>()),
joinClauses);
- HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = joinOnInt.nextBlock();
while (result.isNoOpBlock()) {
result = joinOnInt.nextBlock();
@@ -202,7 +202,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.LEFT,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -236,7 +236,7 @@ public class HashJoinOperatorTest {
List<RexExpression> joinClauses = new ArrayList<>();
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -267,7 +267,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.LEFT,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -301,7 +301,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -339,7 +339,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(new ArrayList<>(), new ArrayList<>()),
joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
@@ -377,7 +377,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(new ArrayList<>(), new ArrayList<>()),
joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
@@ -411,7 +411,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.RIGHT,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = joinOnNum.nextBlock();
while (result.isNoOpBlock()) {
result = joinOnNum.nextBlock();
@@ -438,8 +438,7 @@ public class HashJoinOperatorTest {
Assert.assertTrue(result.isSuccessfulEndOfStreamBlock());
}
- @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*SEMI is not "
- + "supported.*")
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*SEMI is not supported.*")
public void shouldThrowOnSemiJoin() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
@@ -461,7 +460,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.SEMI,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
}
@Test
@@ -485,7 +484,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.FULL,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
@@ -537,7 +536,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.ANTI,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
}
@Test
@@ -562,7 +561,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -595,7 +594,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -631,7 +630,7 @@ public class HashJoinOperatorTest {
});
JoinNode node =
new JoinNode(1, resultSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator,
_rightOperator, leftSchema, node, 1, 2);
TransferableBlock result = join.nextBlock(); // first no-op consumes first
right data block.
Assert.assertTrue(result.isNoOpBlock());
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 45a04eb2ff..185a6d5d53 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -52,7 +52,8 @@ public class LeafStageTransferableBlockOperatorTest {
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT});
List<InstanceResponseBlock> resultsBlockList =
Collections.singletonList(new InstanceResponseBlock(
new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo",
1}, new Object[]{"", 2})), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -66,18 +67,19 @@ public class LeafStageTransferableBlockOperatorTest {
@Test
public void shouldHandleDesiredDataSchemaConversionCorrectly() {
// Given:
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT boolCol, tsCol, boolCol AS newNamedBoolCol FROM tbl");
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT boolCol, tsCol,
boolCol AS newNamedBoolCol FROM tbl");
DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "tsCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN,
DataSchema.ColumnDataType.TIMESTAMP});
- DataSchema desiredSchema = new DataSchema(new String[]{"boolCol", "tsCol",
"newNamedBoolCol"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN,
DataSchema.ColumnDataType.TIMESTAMP,
- DataSchema.ColumnDataType.BOOLEAN});
+ DataSchema desiredSchema =
+ new DataSchema(new String[]{"boolCol", "tsCol", "newNamedBoolCol"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN,
DataSchema.ColumnDataType.TIMESTAMP, DataSchema.ColumnDataType.BOOLEAN
+ });
List<InstanceResponseBlock> resultsBlockList =
Collections.singletonList(new InstanceResponseBlock(
- new SelectionResultsBlock(resultSchema, Arrays.asList(new Object[]{1,
1660000000000L},
- new Object[]{0, 1600000000000L})), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList,
- desiredSchema);
+ new SelectionResultsBlock(resultSchema,
+ Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0,
1600000000000L})), queryContext));
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList,
desiredSchema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -96,9 +98,10 @@ public class LeafStageTransferableBlockOperatorTest {
DataSchema schema = new DataSchema(new String[]{"boolCol", "tsCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN,
DataSchema.ColumnDataType.TIMESTAMP});
List<InstanceResponseBlock> resultsBlockList =
Collections.singletonList(new InstanceResponseBlock(
- new SelectionResultsBlock(schema, Arrays.asList(new Object[]{1,
1660000000000L},
- new Object[]{0, 1600000000000L})), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ new SelectionResultsBlock(schema,
+ Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0,
1600000000000L})), queryContext));
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -115,13 +118,15 @@ public class LeafStageTransferableBlockOperatorTest {
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl");
DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT});
- List<InstanceResponseBlock> resultsBlockList = Arrays.asList(
- new InstanceResponseBlock(new SelectionResultsBlock(schema,
- Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})),
queryContext),
- new InstanceResponseBlock(new SelectionResultsBlock(schema,
- Arrays.asList(new Object[]{"bar", 3}, new Object[]{"foo", 4})),
queryContext),
+ List<InstanceResponseBlock> resultsBlockList = Arrays.asList(new
InstanceResponseBlock(
+ new SelectionResultsBlock(schema, Arrays.asList(new
Object[]{"foo", 1}, new Object[]{"", 2})),
+ queryContext),
+ new InstanceResponseBlock(
+ new SelectionResultsBlock(schema, Arrays.asList(new
Object[]{"bar", 3}, new Object[]{"foo", 4})),
+ queryContext),
new InstanceResponseBlock(new SelectionResultsBlock(schema,
Collections.emptyList()), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock1 = operator.nextBlock();
@@ -145,12 +150,13 @@ public class LeafStageTransferableBlockOperatorTest {
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT});
InstanceResponseBlock errorBlock = new InstanceResponseBlock();
errorBlock.addException(QueryException.QUERY_EXECUTION_ERROR.getErrorCode(),
"foobar");
- List<InstanceResponseBlock> resultsBlockList = Arrays.asList(
- new InstanceResponseBlock(new SelectionResultsBlock(schema,
- Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})),
queryContext),
+ List<InstanceResponseBlock> resultsBlockList = Arrays.asList(new
InstanceResponseBlock(
+ new SelectionResultsBlock(schema, Arrays.asList(new
Object[]{"foo", 1}, new Object[]{"", 2})),
+ queryContext),
errorBlock,
new InstanceResponseBlock(new SelectionResultsBlock(schema,
Collections.emptyList()), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -162,16 +168,16 @@ public class LeafStageTransferableBlockOperatorTest {
@Test
public void shouldReorderWhenQueryContextAskForNotInOrderGroupByAsDistinct()
{
// Given:
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT intCol, strCol FROM tbl GROUP BY strCol, intCol");
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT intCol, strCol FROM
tbl GROUP BY strCol, intCol");
// result schema doesn't match with DISTINCT columns using GROUP BY.
DataSchema schema = new DataSchema(new String[]{"intCol", "strCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING});
- List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
- new InstanceResponseBlock(new
DistinctResultsBlock(mock(DistinctAggregationFunction.class),
- new DistinctTable(schema, Arrays.asList(
- new Record(new Object[]{1, "foo"}), new Record(new Object[]{2,
"bar"})))), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ List<InstanceResponseBlock> resultsBlockList =
Collections.singletonList(new InstanceResponseBlock(
+ new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new
DistinctTable(schema,
+ Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new
Object[]{2, "bar"})))), queryContext));
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -184,16 +190,15 @@ public class LeafStageTransferableBlockOperatorTest {
@Test
public void shouldParsedBlocksSuccessfullyWithDistinctQuery() {
// Given:
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT DISTINCT strCol, intCol FROM tbl");
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT DISTINCT strCol, intCol FROM
tbl");
// result schema doesn't match with DISTINCT columns using GROUP BY.
DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT});
- List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
- new InstanceResponseBlock(new
DistinctResultsBlock(mock(DistinctAggregationFunction.class),
- new DistinctTable(schema, Arrays.asList(
- new Record(new Object[]{"foo", 1}), new Record(new
Object[]{"bar", 2})))), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ List<InstanceResponseBlock> resultsBlockList =
Collections.singletonList(new InstanceResponseBlock(
+ new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new
DistinctTable(schema,
+ Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new
Object[]{"bar", 2})))), queryContext));
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -209,12 +214,15 @@ public class LeafStageTransferableBlockOperatorTest {
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
"SELECT intCol, count(*), sum(doubleCol), strCol FROM tbl GROUP BY
strCol, intCol");
// result schema doesn't match with columns ordering using GROUP BY, this
should not occur.
- DataSchema schema = new DataSchema(new String[]{"intCol", "count(*)",
"sum(doubleCol)", "strCol"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT,
- DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING});
+ DataSchema schema =
+ new DataSchema(new String[]{"intCol", "count(*)", "sum(doubleCol)",
"strCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.LONG,
+ DataSchema.ColumnDataType.STRING
+ });
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(schema,
Collections.emptyList()), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -230,11 +238,14 @@ public class LeafStageTransferableBlockOperatorTest {
+ "sum(doubleCol) FROM tbl GROUP BY strCol, intCol HAVING
sum(doubleCol) < 10 AND count(*) > 0");
// result schema contains duplicate reference from agg and having. it will
repeat itself.
DataSchema schema = new DataSchema(new String[]{"strCol", "intCol",
"count(*)", "sum(doubleCol)", "sum(doubleCol)"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT,
- DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
DataSchema.ColumnDataType.LONG});
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG
+ });
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(schema,
Collections.emptyList()), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -246,15 +257,14 @@ public class LeafStageTransferableBlockOperatorTest {
@Test
public void shouldNotErrorOutWhenDealingWithAggregationResults() {
// Given:
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT count(*), sum(doubleCol) FROM tbl");
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT count(*), sum(doubleCol)
FROM tbl");
// result schema doesn't match with DISTINCT columns using GROUP BY.
DataSchema schema = new DataSchema(new String[]{"count_star",
"sum(doubleCol)"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.LONG});
- List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
- new InstanceResponseBlock(new
AggregationResultsBlock(queryContext.getAggregationFunctions(),
- Collections.emptyList()), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+ List<InstanceResponseBlock> resultsBlockList =
Collections.singletonList(new InstanceResponseBlock(
+ new AggregationResultsBlock(queryContext.getAggregationFunctions(),
Collections.emptyList()), queryContext));
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -275,8 +285,8 @@ public class LeafStageTransferableBlockOperatorTest {
// When:
List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
new InstanceResponseBlock(new SelectionResultsBlock(resultSchema,
Collections.emptyList()), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(responseBlockList,
- desiredSchema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(responseBlockList,
desiredSchema, 1, 2);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
@@ -287,19 +297,19 @@ public class LeafStageTransferableBlockOperatorTest {
@Test
public void
shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsDistinct() {
// Given:
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT strCol, intCol FROM tbl GROUP BY strCol, intCol");
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM
tbl GROUP BY strCol, intCol");
DataSchema resultSchema = new DataSchema(new String[]{"strCol", "intCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING});
DataSchema desiredSchema = new DataSchema(new String[]{"strCol", "intCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT});
// When:
- List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
- new InstanceResponseBlock(new
DistinctResultsBlock(mock(DistinctAggregationFunction.class),
+ List<InstanceResponseBlock> responseBlockList =
Collections.singletonList(new InstanceResponseBlock(
+ new DistinctResultsBlock(mock(DistinctAggregationFunction.class),
new DistinctTable(resultSchema, Collections.emptyList())),
queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(responseBlockList,
- desiredSchema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(responseBlockList,
desiredSchema, 1, 2);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
@@ -310,18 +320,18 @@ public class LeafStageTransferableBlockOperatorTest {
@Test
public void
shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsGroupBy() {
// Given:
- QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
- "SELECT strCol, SUM(intCol) FROM tbl GROUP BY strCol");
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT strCol, SUM(intCol)
FROM tbl GROUP BY strCol");
DataSchema resultSchema = new DataSchema(new String[]{"strCol",
"SUM(intCol)"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING});
DataSchema desiredSchema = new DataSchema(new String[]{"strCol",
"SUM(intCol)"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT});
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT});
// When:
List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(resultSchema,
Collections.emptyList()), queryContext));
- LeafStageTransferableBlockOperator operator = new
LeafStageTransferableBlockOperator(responseBlockList,
- desiredSchema);
+ LeafStageTransferableBlockOperator operator =
+ new LeafStageTransferableBlockOperator(responseBlockList,
desiredSchema, 1, 2);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 663caf2272..856965cfb4 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -24,13 +24,34 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class LiteralValueOperatorTest {
+ private AutoCloseable _mocks;
+
+ @Mock
+ private PlanRequestContext _context;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
@Test
public void shouldReturnLiteralBlock() {
// Given:
@@ -44,7 +65,7 @@ public class LiteralValueOperatorTest {
new RexExpression.Literal(DataType.STRING, ""),
new RexExpression.Literal(DataType.INT, 2))
);
- LiteralValueOperator operator = new LiteralValueOperator(schema, literals);
+ LiteralValueOperator operator = new LiteralValueOperator(schema, literals,
1, 2);
// When:
TransferableBlock transferableBlock = operator.nextBlock();
@@ -60,7 +81,7 @@ public class LiteralValueOperatorTest {
// Given:
DataSchema schema = new DataSchema(new String[]{}, new ColumnDataType[]{});
List<List<RexExpression>> literals = ImmutableList.of(ImmutableList.of());
- LiteralValueOperator operator = new LiteralValueOperator(schema, literals);
+ LiteralValueOperator operator = new LiteralValueOperator(schema, literals,
1, 2);
// When:
TransferableBlock transferableBlock = operator.nextBlock();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 3dc951dca3..9f6577dc66 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -74,7 +74,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory);
+ server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
@@ -91,7 +91,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory);
+ server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
TransferableBlock errorBlock =
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
Mockito.when(_input.nextBlock())
.thenReturn(errorBlock);
@@ -109,7 +109,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory);
+ server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
Mockito.when(_input.nextBlock())
.thenThrow(new RuntimeException("foo!"));
ArgumentCaptor<TransferableBlock> captor =
ArgumentCaptor.forClass(TransferableBlock.class);
@@ -128,7 +128,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory);
+ server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
TransferableBlock eosBlock =
TransferableBlockUtils.getEndOfStreamTransferableBlock();
Mockito.when(_input.nextBlock())
.thenReturn(eosBlock);
@@ -146,7 +146,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory);
+ server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
Mockito.when(_input.nextBlock())
.thenReturn(dataBlock)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index 2cc6b68a67..b11a00245a 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -64,7 +64,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("foo!")));
@@ -82,7 +82,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
@@ -99,7 +99,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -116,7 +116,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
@@ -139,7 +139,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new
DataSchema.ColumnDataType[]{INT, INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
@@ -162,7 +162,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{STRING});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
@@ -185,7 +185,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
@@ -208,7 +208,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 1,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 1,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new
Object[]{3}))
@@ -231,7 +231,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 1, 1,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 1, 1,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new
Object[]{3}))
@@ -253,7 +253,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 2, 0,
schema, 1);
+ SortOperator op = new SortOperator(_input, collation, directions, 2, 0,
schema, 1, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new
Object[]{3}))
@@ -275,7 +275,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, -1, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, -1, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new
Object[]{3}))
@@ -296,7 +296,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}))
@@ -320,7 +320,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0, 1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING,
Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new
DataSchema.ColumnDataType[]{INT, INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new
Object[]{1, 3}))
@@ -344,7 +344,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0, 1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING,
Direction.DESCENDING);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new
DataSchema.ColumnDataType[]{INT, INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new
Object[]{1, 3}))
@@ -368,7 +368,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0,
schema, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{2}))
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
index b89313c25c..52ffff08fb 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -71,7 +71,7 @@ public class TransformOperatorTest {
RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
RexExpression.InputRef ref1 = new RexExpression.InputRef(1);
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(ref0, ref1), upStreamSchema);
+ new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(!result.isErrorBlock());
@@ -95,7 +95,8 @@ public class TransformOperatorTest {
RexExpression.Literal boolLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
RexExpression.Literal strLiteral = new
RexExpression.Literal(FieldSpec.DataType.STRING, "str");
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+ new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
+ 2);
TransferableBlock result = op.nextBlock();
// Literal operands should just output original literals.
Assert.assertTrue(!result.isErrorBlock());
@@ -125,7 +126,7 @@ public class TransformOperatorTest {
DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE});
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(plus01, minus01), upStreamSchema);
+ new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(!result.isErrorBlock());
List<Object[]> resultRows = result.getContainer();
@@ -153,7 +154,7 @@ public class TransformOperatorTest {
DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE});
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(plus01, minus01), upStreamSchema);
+ new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(result.isErrorBlock());
@@ -173,7 +174,8 @@ public class TransformOperatorTest {
DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING});
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+ new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
+ 2);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(result.isErrorBlock());
DataBlock data = result.getDataBlock();
@@ -196,7 +198,8 @@ public class TransformOperatorTest {
DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN,
DataSchema.ColumnDataType.STRING});
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+ new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
+ 2);
TransferableBlock result = op.nextBlock();
// First block has two rows
Assert.assertFalse(result.isErrorBlock());
@@ -227,7 +230,8 @@ public class TransformOperatorTest {
DataSchema upStreamSchema = new DataSchema(new String[]{"string1",
"string2"}, new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
});
- TransformOperator transform = new TransformOperator(_upstreamOp,
resultSchema, new ArrayList<>(), upStreamSchema);
+ TransformOperator transform =
+ new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(),
upStreamSchema, 1, 2);
}
@Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*doesn't match "
@@ -240,6 +244,6 @@ public class TransformOperatorTest {
});
RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
TransformOperator transform =
- new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(ref0), upStreamSchema);
+ new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(ref0), upStreamSchema, 1, 2);
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]