This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 27b61fe6 Extract more common logic in combine operators (#6696)
27b61fe6 is described below
commit 27b61fe6a338b1363efb64a7fed87d95cc793f8a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Mar 22 10:34:57 2021 -0700
Extract more common logic in combine operators (#6696)
Extracts the phaser logic to the `BaseCombineOperator.getNextBlock()` to
reduce the duplicate code.
Extends `StreamingSelectionOnlyCombineOperator` from `BaseCombineOperator`
---
.../core/operator/combine/BaseCombineOperator.java | 197 +++++++-------
.../operator/combine/GroupByCombineOperator.java | 102 +++----
.../combine/GroupByOrderByCombineOperator.java | 87 ++----
...xValueBasedSelectionOrderByCombineOperator.java | 292 +++++++++++----------
.../core/operator/combine/MinMaxValueContext.java | 36 ---
.../combine/SelectionOrderByCombineOperator.java | 72 +----
.../StreamingSelectionOnlyCombineOperator.java | 176 +++++--------
7 files changed, 389 insertions(+), 573 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 417f449..f57efb3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -19,10 +19,10 @@
package org.apache.pinot.core.operator.combine;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -50,42 +50,41 @@ public abstract class BaseCombineOperator extends
BaseOperator<IntermediateResul
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseCombineOperator.class);
protected final List<Operator> _operators;
+ protected final int _numOperators;
protected final QueryContext _queryContext;
protected final ExecutorService _executorService;
protected final long _endTimeMs;
- protected final int _numOperators;
- // Use a Phaser to ensure all the Futures are done (not scheduled, finished
or interrupted) before the main thread
- // returns. We need to ensure this because the main thread holds the
reference to the segments. If a segment is
- // deleted/refreshed, the segment will be released after the main thread
returns, which would lead to undefined
- // behavior (even JVM crash) when processing queries against it.
- protected final Phaser _phaser = new Phaser(1);
- // Use a _blockingQueue to store the per-segment result
- protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
- private final AtomicLong totalWorkerThreadCpuTimeNs = new AtomicLong(0);
- protected int _numThreads;
- protected Future[] _futures;
-
- public BaseCombineOperator(List<Operator> operators, QueryContext
queryContext, ExecutorService executorService,
- long endTimeMs) {
+ protected final int _numThreads;
+ protected final Future[] _futures;
+ // Use a _blockingQueue to store the intermediate results blocks
+ protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue = new
LinkedBlockingQueue<>();
+ protected final AtomicLong totalWorkerThreadCpuTimeNs = new AtomicLong(0);
+
+ protected BaseCombineOperator(List<Operator> operators, QueryContext
queryContext, ExecutorService executorService,
+ long endTimeMs, int numThreads) {
_operators = operators;
+ _numOperators = _operators.size();
_queryContext = queryContext;
_executorService = executorService;
_endTimeMs = endTimeMs;
- _numOperators = _operators.size();
- _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
- _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+ _numThreads = numThreads;
_futures = new Future[_numThreads];
}
- public BaseCombineOperator(List<Operator> operators, QueryContext
queryContext, ExecutorService executorService,
- long endTimeMs, int numThreads) {
- this(operators, queryContext, executorService, endTimeMs);
- _numThreads = numThreads;
- _futures = new Future[_numThreads];
+ protected BaseCombineOperator(List<Operator> operators, QueryContext
queryContext, ExecutorService executorService,
+ long endTimeMs) {
+ this(operators, queryContext, executorService, endTimeMs,
+ CombineOperatorUtils.getNumThreadsForQuery(operators.size()));
}
@Override
protected IntermediateResultsBlock getNextBlock() {
+ // Use a Phaser to ensure all the Futures are done (not scheduled,
finished or interrupted) before the main thread
+ // returns. We need to ensure this because the main thread holds the
reference to the segments. If a segment is
+ // deleted/refreshed, the segment will be released after the main thread
returns, which would lead to undefined
+ // behavior (even JVM crash) when processing queries against it.
+ Phaser phaser = new Phaser(1);
+
for (int i = 0; i < _numThreads; i++) {
int threadIndex = i;
_futures[i] = _executorService.submit(new TraceRunnable() {
@@ -94,13 +93,41 @@ public abstract class BaseCombineOperator extends
BaseOperator<IntermediateResul
ThreadTimer executionThreadTimer = new ThreadTimer();
executionThreadTimer.start();
- processSegments(threadIndex);
+ // Register the thread to the phaser
+ // NOTE: If the phaser is terminated (returning negative value) when
trying to register the thread, that
+ // means the query execution has finished, and the main thread
has deregistered itself and returned
+ // the result. Directly return as no execution result will be
taken.
+ if (phaser.register() < 0) {
+ return;
+ }
+ try {
+ processSegments(threadIndex);
+ } finally {
+ phaser.arriveAndDeregister();
+ }
-
totalWorkerThreadCpuTimeNs.addAndGet(executionThreadTimer.stopAndGetThreadTimeNs());
+
totalWorkerThreadCpuTimeNs.getAndAdd(executionThreadTimer.stopAndGetThreadTimeNs());
}
});
}
- IntermediateResultsBlock mergedBlock = mergeResultsFromSegments();
+
+ IntermediateResultsBlock mergedBlock;
+ try {
+ mergedBlock = mergeResults();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while merging results blocks (query:
{})", _queryContext, e);
+ mergedBlock = new
IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
+ } finally {
+ // Cancel all ongoing jobs
+ for (Future future : _futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ // Deregister the main thread and wait for all threads done
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
+ }
+
/*
* TODO: setThreadTime logic can be put into
CombineOperatorUtils.setExecutionStatistics(),
* after we extends StreamingSelectionOnlyCombineOperator from
BaseCombineOperator.
@@ -111,92 +138,64 @@ public abstract class BaseCombineOperator extends
BaseOperator<IntermediateResul
}
/**
- * processSegments will execute query on one or more segments in a single
thread.
+ * Executes query on one or more segments in a worker thread.
*/
protected void processSegments(int threadIndex) {
- try {
- // Register the thread to the phaser
- // NOTE: If the phaser is terminated (returning negative value) when
trying to register the thread, that
- // means the query execution has finished, and the main thread has
deregistered itself and returned
- // the result. Directly return as no execution result will be
taken.
- if (_phaser.register() < 0) {
- return;
- }
-
- for (int operatorIndex = threadIndex; operatorIndex < _numOperators;
operatorIndex += _numThreads) {
- try {
- IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock)
_operators.get(operatorIndex).nextBlock();
- if (isQuerySatisfied(resultsBlock)) {
- // Query is satisfied, skip processing the remaining segments
- _blockingQueue.offer(resultsBlock);
- return;
- } else {
- _blockingQueue.offer(resultsBlock);
- }
- } catch (EarlyTerminationException e) {
- // Early-terminated by interruption (canceled by the main thread)
- return;
- } catch (Exception e) {
- // Caught exception, skip processing the remaining operators
- LOGGER
- .error("Caught exception while executing operator of index: {}
(query: {})", operatorIndex, _queryContext,
- e);
- _blockingQueue.offer(new IntermediateResultsBlock(e));
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators;
operatorIndex += _numThreads) {
+ try {
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock)
_operators.get(operatorIndex).nextBlock();
+ if (isQuerySatisfied(resultsBlock)) {
+ // Query is satisfied, skip processing the remaining segments
+ _blockingQueue.offer(resultsBlock);
return;
+ } else {
+ _blockingQueue.offer(resultsBlock);
}
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main thread)
+ return;
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining operators
+ LOGGER.error("Caught exception while executing operator of index: {}
(query: {})", operatorIndex, _queryContext,
+ e);
+ _blockingQueue.offer(new IntermediateResultsBlock(e));
+ return;
}
- } finally {
- _phaser.arriveAndDeregister();
}
}
/**
- * mergeResultsFromSegments will merge multiple intermediate result blocks
into a result block.
+ * Merges the results from the worker threads into a results block.
*/
- protected IntermediateResultsBlock mergeResultsFromSegments() {
+ protected IntermediateResultsBlock mergeResults()
+ throws Exception {
IntermediateResultsBlock mergedBlock = null;
- try {
- int numBlocksMerged = 0;
- while (numBlocksMerged < _numOperators) {
- IntermediateResultsBlock blockToMerge =
- _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- if (blockToMerge == null) {
- // Query times out, skip merging the remaining results blocks
- LOGGER.error("Timed out while polling results block,
numBlocksMerged: {} (query: {})", numBlocksMerged,
- _queryContext);
- mergedBlock = new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
- new TimeoutException("Timed out while polling results block")));
- break;
- }
- if (blockToMerge.getProcessingExceptions() != null) {
- // Caught exception while processing segment, skip merging the
remaining results blocks and directly return
- // the exception
- mergedBlock = blockToMerge;
- break;
- }
- if (mergedBlock == null) {
- mergedBlock = blockToMerge;
- } else {
- mergeResultsBlocks(mergedBlock, blockToMerge);
- }
- numBlocksMerged++;
- if (isQuerySatisfied(mergedBlock)) {
- // Query is satisfied, skip merging the remaining results blocks
- break;
- }
+ int numBlocksMerged = 0;
+ while (numBlocksMerged < _numOperators) {
+ IntermediateResultsBlock blockToMerge =
+ _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ if (blockToMerge == null) {
+ // Query times out, skip merging the remaining results blocks
+ LOGGER.error("Timed out while polling results block, numBlocksMerged:
{} (query: {})", numBlocksMerged,
+ _queryContext);
+ return new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
}
- } catch (Exception e) {
- LOGGER.error("Caught exception while merging results blocks (query:
{})", _queryContext, e);
- mergedBlock = new
IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
- } finally {
- // Cancel all ongoing jobs
- for (Future future : _futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
+ if (blockToMerge.getProcessingExceptions() != null) {
+ // Caught exception while processing segment, skip merging the
remaining results blocks and directly return the
+ // exception
+ return blockToMerge;
+ }
+ if (mergedBlock == null) {
+ mergedBlock = blockToMerge;
+ } else {
+ mergeResultsBlocks(mergedBlock, blockToMerge);
+ }
+ numBlocksMerged++;
+ if (isQuerySatisfied(mergedBlock)) {
+ // Query is satisfied, skip merging the remaining results blocks
+ return mergedBlock;
}
- // Deregister the main thread and wait for all threads done
- _phaser.awaitAdvance(_phaser.arriveAndDeregister());
}
return mergedBlock;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index dadb23a..35395ef 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -49,7 +48,7 @@ import org.slf4j.LoggerFactory;
* TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the
parallelism of the query instead of using
* all threads
*/
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
public class GroupByCombineOperator extends BaseCombineOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(GroupByCombineOperator.class);
private static final String OPERATOR_NAME = "GroupByCombineOperator";
@@ -70,11 +69,6 @@ public class GroupByCombineOperator extends
BaseCombineOperator {
private final int _numAggregationFunctions;
// We use a CountDownLatch to track if all Futures are finished by the query
timeout, and cancel the unfinished
// _futures (try to interrupt the execution if it already started).
- // Besides the CountDownLatch, we also use a Phaser to ensure all the
Futures are done (not scheduled, finished or
- // interrupted) before the main thread returns. We need to ensure no
execution left before the main thread returning
- // because the main thread holds the reference to the segments, and if the
segments are deleted/refreshed, the
- // segments can be released after the main thread returns, which would lead
to undefined behavior (even JVM crash)
- // when executing queries against them.
private final CountDownLatch _operatorLatch;
public GroupByCombineOperator(List<Operator> operators, QueryContext
queryContext, ExecutorService executorService,
@@ -88,27 +82,20 @@ public class GroupByCombineOperator extends
BaseCombineOperator {
_aggregationFunctions = _queryContext.getAggregationFunctions();
assert _aggregationFunctions != null;
_numAggregationFunctions = _aggregationFunctions.length;
- int numOperators = _operators.size();
- _operatorLatch = new CountDownLatch(numOperators);
+ _operatorLatch = new CountDownLatch(_numOperators);
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
}
/**
- * {@inheritDoc}
- *
- * <p> Execute query on one or more segments in a single thread, and store
multiple intermediate result blocks into a
- * map
+ * Executes query on one segment in a worker thread and merges the results
into the results map.
*/
@Override
protected void processSegments(int threadIndex) {
try {
- // Register the thread to the _phaser.
- // If the _phaser is terminated (returning negative value) when trying
to register the thread, that means the
- // query execution has timed out, and the main thread has deregistered
itself and returned the result.
- // Directly return as no execution result will be taken.
- if (_phaser.register() < 0) {
- return;
- }
-
IntermediateResultsBlock intermediateResultsBlock =
(IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
@@ -153,7 +140,6 @@ public class GroupByCombineOperator extends
BaseCombineOperator {
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
} finally {
_operatorLatch.countDown();
- _phaser.arriveAndDeregister();
}
}
@@ -177,57 +163,39 @@ public class GroupByCombineOperator extends
BaseCombineOperator {
* </ul>
*/
@Override
- protected IntermediateResultsBlock mergeResultsFromSegments() {
- try {
- long timeoutMs = _endTimeMs - System.currentTimeMillis();
- boolean opCompleted = _operatorLatch.await(timeoutMs,
TimeUnit.MILLISECONDS);
- if (!opCompleted) {
- // If this happens, the broker side should already timed out, just log
the error and return
- String errorMessage = String
- .format("Timed out while combining group-by results after %dms,
queryContext = %s", timeoutMs,
- _queryContext);
- LOGGER.error(errorMessage);
- return new IntermediateResultsBlock(new
TimeoutException(errorMessage));
- }
-
- // Trim the results map.
- AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
- new AggregationGroupByTrimmingService(_queryContext);
- List<Map<String, Object>> trimmedResults =
-
aggregationGroupByTrimmingService.trimIntermediateResultsMap(_resultsMap);
- IntermediateResultsBlock mergedBlock = new
IntermediateResultsBlock(_aggregationFunctions, trimmedResults, true);
+ protected IntermediateResultsBlock mergeResults()
+ throws Exception {
+ long timeoutMs = _endTimeMs - System.currentTimeMillis();
+ boolean opCompleted = _operatorLatch.await(timeoutMs,
TimeUnit.MILLISECONDS);
+ if (!opCompleted) {
+ // If this happens, the broker side should already timed out, just log
the error and return
+ String errorMessage = String
+ .format("Timed out while combining group-by results after %dms,
queryContext = %s", timeoutMs, _queryContext);
+ LOGGER.error(errorMessage);
+ return new IntermediateResultsBlock(new TimeoutException(errorMessage));
+ }
- // Set the processing exceptions.
- if (!_mergedProcessingExceptions.isEmpty()) {
- mergedBlock.setProcessingExceptions(new
ArrayList<>(_mergedProcessingExceptions));
- }
- // TODO: this value should be set in the inner-segment operators.
Setting it here might cause false positive as we
- // are comparing number of groups across segments with the groups
limit for each segment.
- if (_resultsMap.size() >= _innerSegmentNumGroupsLimit) {
- mergedBlock.setNumGroupsLimitReached(true);
- }
+ // Trim the results map.
+ AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
+ new AggregationGroupByTrimmingService(_queryContext);
+ List<Map<String, Object>> trimmedResults =
+
aggregationGroupByTrimmingService.trimIntermediateResultsMap(_resultsMap);
+ IntermediateResultsBlock mergedBlock = new
IntermediateResultsBlock(_aggregationFunctions, trimmedResults, true);
- return mergedBlock;
- } catch (Exception e) {
- return new IntermediateResultsBlock(e);
- } finally {
- // Cancel all ongoing jobs
- for (Future future : _futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
- }
- // Deregister the main thread and wait for all threads done
- _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+ // Set the processing exceptions.
+ if (!_mergedProcessingExceptions.isEmpty()) {
+ mergedBlock.setProcessingExceptions(new
ArrayList<>(_mergedProcessingExceptions));
+ }
+ // TODO: this value should be set in the inner-segment operators. Setting
it here might cause false positive as we
+ // are comparing number of groups across segments with the groups
limit for each segment.
+ if (_resultsMap.size() >= _innerSegmentNumGroupsLimit) {
+ mergedBlock.setNumGroupsLimitReached(true);
}
- }
- @Override
- protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock,
IntermediateResultsBlock blockToMerge) {
+ return mergedBlock;
}
@Override
- public String getOperatorName() {
- return OPERATOR_NAME;
+ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock,
IntermediateResultsBlock blockToMerge) {
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 38f1a54..4cd518d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
@@ -68,11 +67,6 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
private final ConcurrentLinkedQueue<ProcessingException>
_mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
// We use a CountDownLatch to track if all Futures are finished by the query
timeout, and cancel the unfinished
// _futures (try to interrupt the execution if it already started).
- // Besides the CountDownLatch, we also use a Phaser to ensure all the
Futures are done (not scheduled, finished or
- // interrupted) before the main thread returns. We need to ensure no
execution left before the main thread returning
- // because the main thread holds the reference to the segments, and if the
segments are deleted/refreshed, the
- // segments can be released after the main thread returns, which would lead
to undefined behavior (even JVM crash)
- // when executing queries against them.
private final CountDownLatch _operatorLatch;
private DataSchema _dataSchema;
private ConcurrentIndexedTable _indexedTable;
@@ -95,23 +89,17 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
_operatorLatch = new CountDownLatch(numOperators);
}
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
/**
- * {@inheritDoc}
- *
- * <p> Execute query on one or more segments in a single thread, and store
multiple intermediate result blocks
- * into {@link org.apache.pinot.core.data.table.IndexedTable}
+ * Executes query on one segment in a worker thread and merges the results
into the indexed table.
*/
@Override
protected void processSegments(int threadIndex) {
try {
- // Register the thread to the _phaser.
- // If the _phaser is terminated (returning negative value) when trying
to register the thread, that means the
- // query execution has timed out, and the main thread has deregistered
itself and returned the result.
- // Directly return as no execution result will be taken.
- if (_phaser.register() < 0) {
- return;
- }
-
IntermediateResultsBlock intermediateResultsBlock =
(IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
@@ -164,7 +152,6 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
} finally {
_operatorLatch.countDown();
- _phaser.arriveAndDeregister();
}
}
@@ -182,52 +169,34 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
* </ul>
*/
@Override
- protected IntermediateResultsBlock mergeResultsFromSegments() {
- try {
- long timeoutMs = _endTimeMs - System.currentTimeMillis();
- boolean opCompleted = _operatorLatch.await(timeoutMs,
TimeUnit.MILLISECONDS);
- if (!opCompleted) {
- // If this happens, the broker side should already timed out, just log
the error and return
- String errorMessage = String
- .format("Timed out while combining group-by order-by results after
%dms, queryContext = %s", timeoutMs,
- _queryContext);
- LOGGER.error(errorMessage);
- return new IntermediateResultsBlock(new
TimeoutException(errorMessage));
- }
+ protected IntermediateResultsBlock mergeResults()
+ throws Exception {
+ long timeoutMs = _endTimeMs - System.currentTimeMillis();
+ boolean opCompleted = _operatorLatch.await(timeoutMs,
TimeUnit.MILLISECONDS);
+ if (!opCompleted) {
+ // If this happens, the broker side should already timed out, just log
the error and return
+ String errorMessage = String
+ .format("Timed out while combining group-by order-by results after
%dms, queryContext = %s", timeoutMs,
+ _queryContext);
+ LOGGER.error(errorMessage);
+ return new IntermediateResultsBlock(new TimeoutException(errorMessage));
+ }
- _indexedTable.finish(false);
- IntermediateResultsBlock mergedBlock = new
IntermediateResultsBlock(_indexedTable);
+ _indexedTable.finish(false);
+ IntermediateResultsBlock mergedBlock = new
IntermediateResultsBlock(_indexedTable);
- // Set the processing exceptions.
- if (!_mergedProcessingExceptions.isEmpty()) {
- mergedBlock.setProcessingExceptions(new
ArrayList<>(_mergedProcessingExceptions));
- }
-
- mergedBlock.setNumResizes(_indexedTable.getNumResizes());
- mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
- // TODO - set numGroupsLimitReached
- return mergedBlock;
- } catch (Exception e) {
- return new IntermediateResultsBlock(e);
- } finally {
- // Cancel all ongoing jobs
- for (Future future : _futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
- }
- // Deregister the main thread and wait for all threads done
- _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+ // Set the processing exceptions.
+ if (!_mergedProcessingExceptions.isEmpty()) {
+ mergedBlock.setProcessingExceptions(new
ArrayList<>(_mergedProcessingExceptions));
}
- }
-
- @Override
- protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock,
IntermediateResultsBlock blockToMerge) {
+ mergedBlock.setNumResizes(_indexedTable.getNumResizes());
+ mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
+ // TODO - set numGroupsLimitReached
+ return mergedBlock;
}
@Override
- public String getOperatorName() {
- return OPERATOR_NAME;
+ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock,
IntermediateResultsBlock blockToMerge) {
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 29359f6..e7df81c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -18,20 +18,22 @@
*/
package org.apache.pinot.core.operator.combine;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.DataSourceMetadata;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
import org.apache.pinot.core.query.exception.EarlyTerminationException;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
@@ -67,11 +69,48 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
private final int _numRowsToKeep;
private final List<MinMaxValueContext> _minMaxValueContexts;
- public MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator>
operators, QueryContext queryContext,
- ExecutorService executorService, long endTimeMs,
List<MinMaxValueContext> minMaxValueContexts) {
+ MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators,
QueryContext queryContext,
+ ExecutorService executorService, long endTimeMs) {
super(operators, queryContext, executorService, endTimeMs);
- _minMaxValueContexts = minMaxValueContexts;
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
+
+ List<OrderByExpressionContext> orderByExpressions =
_queryContext.getOrderByExpressions();
+ assert orderByExpressions != null;
+ int numOrderByExpressions = orderByExpressions.size();
+ assert numOrderByExpressions > 0;
+ OrderByExpressionContext firstOrderByExpression =
orderByExpressions.get(0);
+ assert firstOrderByExpression.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER;
+ String firstOrderByColumn =
firstOrderByExpression.getExpression().getIdentifier();
+
+ _minMaxValueContexts = new ArrayList<>(_numOperators);
+ for (Operator operator : _operators) {
+ _minMaxValueContexts.add(new
MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn));
+ }
+ if (firstOrderByExpression.isAsc()) {
+ // For ascending order, sort on column min value in ascending order
+ _minMaxValueContexts.sort((o1, o2) -> {
+ // Put segments without column min value in the front because we
always need to process them
+ if (o1._minValue == null) {
+ return o2._minValue == null ? 0 : -1;
+ }
+ if (o2._minValue == null) {
+ return 1;
+ }
+ return o1._minValue.compareTo(o2._minValue);
+ });
+ } else {
+ // For descending order, sort on column max value in descending order
+ _minMaxValueContexts.sort((o1, o2) -> {
+ // Put segments without column max value in the front because we
always need to process them
+ if (o1._maxValue == null) {
+ return o2._maxValue == null ? 0 : -1;
+ }
+ if (o2._maxValue == null) {
+ return 1;
+ }
+ return o2._maxValue.compareTo(o1._maxValue);
+ });
+ }
}
@Override
@@ -95,106 +134,93 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
assert firstOrderByExpression.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER;
boolean asc = firstOrderByExpression.isAsc();
- try {
- // Register the thread to the _phaser
- // NOTE: If the _phaser is terminated (returning negative value) when
trying to register the thread, that
- // means the query execution has finished, and the main thread has
deregistered itself and returned
- // the result. Directly return as no execution result will be
taken.
- if (_phaser.register() < 0) {
- return;
- }
+ // Keep a boundary value for the thread
+ // NOTE: The thread boundary value can be different from the global
boundary value because thread boundary
+ // value is updated after processing the segment, while global
boundary value is updated after the
+ // segment result is merged.
+ Comparable threadBoundaryValue = null;
- // Keep a boundary value for the thread
- // NOTE: The thread boundary value can be different from the global
boundary value because thread boundary
- // value is updated after processing the segment, while global
boundary value is updated after the
- // segment result is merged.
- Comparable threadBoundaryValue = null;
-
- for (int operatorIndex = threadIndex; operatorIndex < _numOperators;
operatorIndex += _numThreads) {
- // Calculate the boundary value from global boundary and thread
boundary
- Comparable boundaryValue = _globalBoundaryValue.get();
- if (boundaryValue == null) {
- boundaryValue = threadBoundaryValue;
- } else {
- if (threadBoundaryValue != null) {
- if (asc) {
- if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
- boundaryValue = threadBoundaryValue;
- }
- } else {
- if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
- boundaryValue = threadBoundaryValue;
- }
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators;
operatorIndex += _numThreads) {
+ // Calculate the boundary value from global boundary and thread boundary
+ Comparable boundaryValue = _globalBoundaryValue.get();
+ if (boundaryValue == null) {
+ boundaryValue = threadBoundaryValue;
+ } else {
+ if (threadBoundaryValue != null) {
+ if (asc) {
+ if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
+ boundaryValue = threadBoundaryValue;
+ }
+ } else {
+ if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
+ boundaryValue = threadBoundaryValue;
}
}
}
+ }
- // Check if the segment can be skipped
- MinMaxValueContext minMaxValueContext =
_minMaxValueContexts.get(operatorIndex);
- if (boundaryValue != null) {
- if (asc) {
- // For ascending order, no need to process more segments if the
column min value is larger than the
- // boundary value, or is equal to the boundary value and the there
is only one order-by expression
- if (minMaxValueContext._minValue != null) {
- int result =
minMaxValueContext._minValue.compareTo(boundaryValue);
- if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex
- 1) / _numThreads);
- _blockingQueue.offer(LAST_RESULTS_BLOCK);
- return;
- }
+ // Check if the segment can be skipped
+ MinMaxValueContext minMaxValueContext =
_minMaxValueContexts.get(operatorIndex);
+ if (boundaryValue != null) {
+ if (asc) {
+ // For ascending order, no need to process more segments if the
column min value is larger than the
+ // boundary value, or is equal to the boundary value and the there
is only one order-by expression
+ if (minMaxValueContext._minValue != null) {
+ int result = minMaxValueContext._minValue.compareTo(boundaryValue);
+ if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex -
1) / _numThreads);
+ _blockingQueue.offer(LAST_RESULTS_BLOCK);
+ return;
}
- } else {
- // For descending order, no need to process more segments if the
column max value is smaller than the
- // boundary value, or is equal to the boundary value and the there
is only one order-by expression
- if (minMaxValueContext._maxValue != null) {
- int result =
minMaxValueContext._maxValue.compareTo(boundaryValue);
- if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex
- 1) / _numThreads);
- _blockingQueue.offer(LAST_RESULTS_BLOCK);
- return;
- }
+ }
+ } else {
+ // For descending order, no need to process more segments if the
column max value is smaller than the
+ // boundary value, or is equal to the boundary value and the there
is only one order-by expression
+ if (minMaxValueContext._maxValue != null) {
+ int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
+ if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex -
1) / _numThreads);
+ _blockingQueue.offer(LAST_RESULTS_BLOCK);
+ return;
}
}
}
+ }
- // Process the segment
- try {
- IntermediateResultsBlock resultsBlock =
minMaxValueContext._operator.nextBlock();
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getSelectionResult();
- if (selectionResult != null && selectionResult.size() ==
_numRowsToKeep) {
- // Segment result has enough rows, update the boundary value
- assert selectionResult.peek() != null;
- Comparable segmentBoundaryValue = (Comparable)
selectionResult.peek()[0];
- if (boundaryValue == null) {
- boundaryValue = segmentBoundaryValue;
+ // Process the segment
+ try {
+ IntermediateResultsBlock resultsBlock =
minMaxValueContext._operator.nextBlock();
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getSelectionResult();
+ if (selectionResult != null && selectionResult.size() ==
_numRowsToKeep) {
+ // Segment result has enough rows, update the boundary value
+ assert selectionResult.peek() != null;
+ Comparable segmentBoundaryValue = (Comparable)
selectionResult.peek()[0];
+ if (boundaryValue == null) {
+ boundaryValue = segmentBoundaryValue;
+ } else {
+ if (asc) {
+ if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+ boundaryValue = segmentBoundaryValue;
+ }
} else {
- if (asc) {
- if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
- boundaryValue = segmentBoundaryValue;
- }
- } else {
- if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
- boundaryValue = segmentBoundaryValue;
- }
+ if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+ boundaryValue = segmentBoundaryValue;
}
}
}
- threadBoundaryValue = boundaryValue;
- _blockingQueue.offer(resultsBlock);
- } catch (EarlyTerminationException e) {
- // Early-terminated by interruption (canceled by the main thread)
- return;
- } catch (Exception e) {
- // Caught exception, skip processing the remaining operators
- LOGGER
- .error("Caught exception while executing operator of index: {}
(query: {})", operatorIndex, _queryContext,
- e);
- _blockingQueue.offer(new IntermediateResultsBlock(e));
- return;
}
+ threadBoundaryValue = boundaryValue;
+ _blockingQueue.offer(resultsBlock);
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main thread)
+ return;
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining operators
+ LOGGER.error("Caught exception while executing operator of index: {}
(query: {})", operatorIndex, _queryContext,
+ e);
+ _blockingQueue.offer(new IntermediateResultsBlock(e));
+ return;
}
- } finally {
- _phaser.arriveAndDeregister();
}
}
@@ -212,55 +238,42 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
* </ul>
*/
@Override
- protected IntermediateResultsBlock mergeResultsFromSegments() {
+ protected IntermediateResultsBlock mergeResults()
+ throws Exception {
IntermediateResultsBlock mergedBlock = null;
- try {
- int numBlocksMerged = 0;
- while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
- IntermediateResultsBlock blockToMerge =
- _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- if (blockToMerge == null) {
- // Query times out, skip merging the remaining results blocks
- LOGGER.error("Timed out while polling results block,
numBlocksMerged: {} (query: {})", numBlocksMerged,
- _queryContext);
- mergedBlock = new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
- new TimeoutException("Timed out while polling results block")));
- break;
- }
- if (blockToMerge.getProcessingExceptions() != null) {
- // Caught exception while processing segment, skip merging the
remaining results blocks and directly return
- // the exception
- mergedBlock = blockToMerge;
- break;
- }
- if (mergedBlock == null) {
- mergedBlock = blockToMerge;
- } else {
- if (blockToMerge != LAST_RESULTS_BLOCK) {
- mergeResultsBlocks(mergedBlock, blockToMerge);
- }
- }
- numBlocksMerged++;
-
- // Update the boundary value if enough rows are collected
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
mergedBlock.getSelectionResult();
- if (selectionResult != null && selectionResult.size() ==
_numRowsToKeep) {
- assert selectionResult.peek() != null;
- _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
- }
+ int numBlocksMerged = 0;
+ while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
+ IntermediateResultsBlock blockToMerge =
+ _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ if (blockToMerge == null) {
+ // Query times out, skip merging the remaining results blocks
+ LOGGER.error("Timed out while polling results block, numBlocksMerged:
{} (query: {})", numBlocksMerged,
+ _queryContext);
+ mergedBlock = new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
+ break;
}
- } catch (Exception e) {
- LOGGER.error("Caught exception while merging results blocks (query:
{})", _queryContext, e);
- mergedBlock = new
IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
- } finally {
- // Cancel all ongoing jobs
- for (Future future : _futures) {
- if (!future.isDone()) {
- future.cancel(true);
+ if (blockToMerge.getProcessingExceptions() != null) {
+ // Caught exception while processing segment, skip merging the
remaining results blocks and directly return
+ // the exception
+ mergedBlock = blockToMerge;
+ break;
+ }
+ if (mergedBlock == null) {
+ mergedBlock = blockToMerge;
+ } else {
+ if (blockToMerge != LAST_RESULTS_BLOCK) {
+ mergeResultsBlocks(mergedBlock, blockToMerge);
}
}
- // Deregister the main thread and wait for all threads done
- _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+ numBlocksMerged++;
+
+ // Update the boundary value if enough rows are collected
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
mergedBlock.getSelectionResult();
+ if (selectionResult != null && selectionResult.size() == _numRowsToKeep)
{
+ assert selectionResult.peek() != null;
+ _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+ }
}
return mergedBlock;
}
@@ -286,4 +299,17 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
assert mergedRows != null && rowsToMerge != null;
SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge,
_numRowsToKeep);
}
+
+ private static class MinMaxValueContext {
+ final SelectionOrderByOperator _operator;
+ final Comparable _minValue;
+ final Comparable _maxValue;
+
+ MinMaxValueContext(SelectionOrderByOperator operator, String column) {
+ _operator = operator;
+ DataSourceMetadata dataSourceMetadata =
operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
+ _minValue = dataSourceMetadata.getMinValue();
+ _maxValue = dataSourceMetadata.getMaxValue();
+ }
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
deleted file mode 100644
index 3f76058..0000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.combine;
-
-import org.apache.pinot.core.common.DataSourceMetadata;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-
-
-public class MinMaxValueContext {
- final SelectionOrderByOperator _operator;
- final Comparable _minValue;
- final Comparable _maxValue;
-
- MinMaxValueContext(SelectionOrderByOperator operator, String column) {
- _operator = operator;
- DataSourceMetadata dataSourceMetadata =
operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
- _minValue = dataSourceMetadata.getMinValue();
- _maxValue = dataSourceMetadata.getMaxValue();
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index dfd34e9..ef8a7d0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.operator.combine;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.PriorityQueue;
@@ -27,7 +26,6 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -43,24 +41,16 @@ import org.slf4j.LoggerFactory;
* skip processing some segments based on the column min/max value. Otherwise
fall back to the default combine
* (process all segments).
*/
-@SuppressWarnings({"rawtypes", "unchecked"})
+@SuppressWarnings("rawtypes")
public class SelectionOrderByCombineOperator extends BaseCombineOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(SelectionOrderByCombineOperator.class);
private static final String OPERATOR_NAME =
"SelectionOrderByCombineOperator";
- private final List<Operator> _operators;
- private final QueryContext _queryContext;
- private final ExecutorService _executorService;
- private final long _endTimeMs;
private final int _numRowsToKeep;
public SelectionOrderByCombineOperator(List<Operator> operators,
QueryContext queryContext,
ExecutorService executorService, long endTimeMs) {
super(operators, queryContext, executorService, endTimeMs);
- _operators = operators;
- _queryContext = queryContext;
- _executorService = executorService;
- _endTimeMs = endTimeMs;
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
}
@@ -83,62 +73,14 @@ public class SelectionOrderByCombineOperator extends
BaseCombineOperator {
List<OrderByExpressionContext> orderByExpressions =
_queryContext.getOrderByExpressions();
assert orderByExpressions != null;
if (orderByExpressions.get(0).getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER) {
- return tryMinMaxValueBasedCombine(orderByExpressions);
- } else {
- // Fall back to the default combine (process all segments) when segments
have different data types for the first
- // order-by column
- return super.getNextBlock();
- }
- }
-
- private IntermediateResultsBlock
tryMinMaxValueBasedCombine(List<OrderByExpressionContext> orderByExpressions) {
- int numOrderByExpressions = orderByExpressions.size();
- assert numOrderByExpressions > 0;
- OrderByExpressionContext firstOrderByExpression =
orderByExpressions.get(0);
- assert firstOrderByExpression.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER;
- String firstOrderByColumn =
firstOrderByExpression.getExpression().getIdentifier();
- boolean asc = firstOrderByExpression.isAsc();
-
- int numOperators = _operators.size();
- List<MinMaxValueContext> minMaxValueContexts = new
ArrayList<>(numOperators);
- for (Operator operator : _operators) {
- minMaxValueContexts.add(new
MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn));
- }
- try {
- if (asc) {
- // For ascending order, sort on column min value in ascending order
- minMaxValueContexts.sort((o1, o2) -> {
- // Put segments without column min value in the front because we
always need to process them
- if (o1._minValue == null) {
- return o2._minValue == null ? 0 : -1;
- }
- if (o2._minValue == null) {
- return 1;
- }
- return o1._minValue.compareTo(o2._minValue);
- });
- } else {
- // For descending order, sort on column max value in descending order
- minMaxValueContexts.sort((o1, o2) -> {
- // Put segments without column max value in the front because we
always need to process them
- if (o1._maxValue == null) {
- return o2._maxValue == null ? 0 : -1;
- }
- if (o2._maxValue == null) {
- return 1;
- }
- return o2._maxValue.compareTo(o1._maxValue);
- });
+ try {
+ return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators,
_queryContext, _executorService,
+ _endTimeMs).getNextBlock();
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while using min/max value based combine,
using the default combine", e);
}
- } catch (Exception e) {
- // Fall back to the default combine (process all segments) if there are
any exceptions.
- LOGGER.warn("Segments have different data types for the first order-by
column: {}, using the default combine",
- firstOrderByColumn);
- return super.getNextBlock();
}
-
- return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators,
_queryContext, _executorService, _endTimeMs,
- minMaxValueContexts).getNextBlock();
+ return super.getNextBlock();
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
index e84e084..869044d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -22,35 +22,29 @@ import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
+import org.apache.pinot.core.operator.combine.BaseCombineOperator;
import org.apache.pinot.core.query.exception.EarlyTerminationException;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.core.util.trace.TraceRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Combine operator for selection only streaming queries.
- * TODO: extend StreamingSelectionOnlyCombineOperator from BaseCombineOperator.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class StreamingSelectionOnlyCombineOperator extends
BaseOperator<IntermediateResultsBlock> {
+public class StreamingSelectionOnlyCombineOperator extends BaseCombineOperator
{
private static final Logger LOGGER =
LoggerFactory.getLogger(StreamingSelectionOnlyCombineOperator.class);
private static final String OPERATOR_NAME =
"StreamingSelectionOnlyCombineOperator";
@@ -59,19 +53,13 @@ public class StreamingSelectionOnlyCombineOperator extends
BaseOperator<Intermed
new IntermediateResultsBlock(new DataSchema(new String[0], new
DataSchema.ColumnDataType[0]),
Collections.emptyList());
- private final List<Operator> _operators;
- private final QueryContext _queryContext;
- private final ExecutorService _executorService;
- private final long _endTimeMs;
private final StreamObserver<Server.ServerResponse> _streamObserver;
private final int _limit;
+ private final AtomicLong _numRowsCollected = new AtomicLong();
public StreamingSelectionOnlyCombineOperator(List<Operator> operators,
QueryContext queryContext,
ExecutorService executorService, long endTimeMs,
StreamObserver<Server.ServerResponse> streamObserver) {
- _operators = operators;
- _queryContext = queryContext;
- _executorService = executorService;
- _endTimeMs = endTimeMs;
+ super(operators, queryContext, executorService, endTimeMs);
_streamObserver = streamObserver;
_limit = queryContext.getLimit();
}
@@ -82,109 +70,69 @@ public class StreamingSelectionOnlyCombineOperator extends
BaseOperator<Intermed
}
@Override
- protected IntermediateResultsBlock getNextBlock() {
- int numOperators = _operators.size();
- int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
- // Use a BlockingQueue to store all the results blocks
- BlockingQueue<IntermediateResultsBlock> blockingQueue = new
LinkedBlockingQueue<>();
- // Use a Phaser to ensure all the Futures are done (not scheduled,
finished or interrupted) before the main thread
- // returns. We need to ensure this because the main thread holds the
reference to the segments. If a segment is
- // deleted/refreshed, the segment will be released after the main thread
returns, which would lead to undefined
- // behavior (even JVM crash) when processing queries against it.
- Phaser phaser = new Phaser(1);
-
- Future[] futures = new Future[numThreads];
- for (int i = 0; i < numThreads; i++) {
- int threadIndex = i;
- futures[i] = _executorService.submit(new TraceRunnable() {
- @Override
- public void runJob() {
- try {
- // Register the thread to the phaser
- // NOTE: If the phaser is terminated (returning negative value)
when trying to register the thread, that
- // means the query execution has finished, and the main
thread has deregistered itself and returned
- // the result. Directly return as no execution result will
be taken.
- if (phaser.register() < 0) {
- return;
- }
-
- int numRowsCollected = 0;
- for (int operatorIndex = threadIndex; operatorIndex <
numOperators; operatorIndex += numThreads) {
- Operator<IntermediateResultsBlock> operator =
_operators.get(operatorIndex);
- try {
- IntermediateResultsBlock resultsBlock;
- while ((resultsBlock = operator.nextBlock()) != null) {
- Collection<Object[]> rows =
resultsBlock.getSelectionResult();
- assert rows != null;
- numRowsCollected += rows.size();
- blockingQueue.offer(resultsBlock);
- if (numRowsCollected >= _limit) {
- return;
- }
- }
- blockingQueue.offer(LAST_RESULTS_BLOCK);
- } catch (EarlyTerminationException e) {
- // Early-terminated by interruption (canceled by the main
thread)
- return;
- } catch (Exception e) {
- // Caught exception, skip processing the remaining operators
- LOGGER.error("Caught exception while executing operator of
index: {} (query: {})", operatorIndex,
- _queryContext, e);
- blockingQueue.offer(new IntermediateResultsBlock(e));
- return;
- }
- }
- } finally {
- phaser.arriveAndDeregister();
+ protected void processSegments(int threadIndex) {
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators;
operatorIndex += _numThreads) {
+ Operator<IntermediateResultsBlock> operator =
_operators.get(operatorIndex);
+ try {
+ IntermediateResultsBlock resultsBlock;
+ while ((resultsBlock = operator.nextBlock()) != null) {
+ Collection<Object[]> rows = resultsBlock.getSelectionResult();
+ assert rows != null;
+ long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
+ _blockingQueue.offer(resultsBlock);
+ if (numRowsCollected >= _limit) {
+ return;
}
}
- });
+ _blockingQueue.offer(LAST_RESULTS_BLOCK);
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main thread)
+ return;
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining operators
+ LOGGER.error("Caught exception while executing operator of index: {}
(query: {})", operatorIndex, _queryContext,
+ e);
+ _blockingQueue.offer(new IntermediateResultsBlock(e));
+ return;
+ }
}
+ }
- try {
- int numRowsCollected = 0;
- int numOperatorsFinished = 0;
- while (numRowsCollected < _limit && numOperatorsFinished < numOperators)
{
- IntermediateResultsBlock resultsBlock =
- blockingQueue.poll(_endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- if (resultsBlock == null) {
- // Query times out, skip streaming the remaining results blocks
- LOGGER.error("Timed out while polling results block (query: {})",
_queryContext);
- return new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
- new TimeoutException("Timed out while polling results block")));
- }
- if (resultsBlock.getProcessingExceptions() != null) {
- // Caught exception while processing segment, skip streaming the
remaining results blocks and directly return
- // the exception
- return resultsBlock;
- }
- if (resultsBlock == LAST_RESULTS_BLOCK) {
- numOperatorsFinished++;
- continue;
- }
- DataSchema dataSchema = resultsBlock.getDataSchema();
- Collection<Object[]> rows = resultsBlock.getSelectionResult();
- assert dataSchema != null && rows != null;
- numRowsCollected += rows.size();
- DataTable dataTable =
SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
-
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable));
+ @Override
+ protected IntermediateResultsBlock mergeResults()
+ throws Exception {
+ long numRowsCollected = 0;
+ int numOperatorsFinished = 0;
+ while (numRowsCollected < _limit && numOperatorsFinished < _numOperators) {
+ IntermediateResultsBlock resultsBlock =
+ _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ if (resultsBlock == null) {
+ // Query times out, skip streaming the remaining results blocks
+ LOGGER.error("Timed out while polling results block (query: {})",
_queryContext);
+ return new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
}
- IntermediateResultsBlock metadataBlock = new IntermediateResultsBlock();
- CombineOperatorUtils.setExecutionStatistics(metadataBlock, _operators);
- return metadataBlock;
- } catch (Exception e) {
- LOGGER.error("Caught exception while streaming results blocks (query:
{})", _queryContext, e);
- return new IntermediateResultsBlock(QueryException.INTERNAL_ERROR, e);
- } finally {
- // Cancel all ongoing jobs
- for (Future future : futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
+ if (resultsBlock.getProcessingExceptions() != null) {
+ // Caught exception while processing segment, skip streaming the
remaining results blocks and directly return
+ // the exception
+ return resultsBlock;
+ }
+ if (resultsBlock == LAST_RESULTS_BLOCK) {
+ numOperatorsFinished++;
+ continue;
}
- // Deregister the main thread and wait for all threads done
- phaser.awaitAdvance(phaser.arriveAndDeregister());
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ Collection<Object[]> rows = resultsBlock.getSelectionResult();
+ assert dataSchema != null && rows != null;
+ numRowsCollected += rows.size();
+ DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows,
dataSchema);
+
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable));
}
+ // Return an empty results block for the metadata
+ return new IntermediateResultsBlock();
+ }
+
+ @Override
+ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock,
IntermediateResultsBlock blockToMerge) {
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]