This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b670c552f5 Fix pipeline breaker error handling (#11411)
b670c552f5 is described below
commit b670c552f5b8ab8cbf6d1eaf79c79a0d926c7ebe
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Aug 23 11:43:45 2023 -0700
Fix pipeline breaker error handling (#11411)
---
.../apache/pinot/query/mailbox/MailboxService.java | 2 -
.../apache/pinot/query/runtime/QueryRunner.java | 56 ++++++----
.../plan/pipeline/PipelineBreakerExecutor.java | 56 +++++-----
.../plan/pipeline/PipelineBreakerOperator.java | 116 +++++++++------------
.../plan/pipeline/PipelineBreakerResult.java | 9 +-
.../query/service/dispatch/QueryDispatcher.java | 12 +--
.../pinot/query/runtime/QueryRunnerTest.java | 46 ++++----
.../pinot/query/runtime/QueryRunnerTestBase.java | 10 +-
.../plan/pipeline/PipelineBreakerExecutorTest.java | 28 ++---
9 files changed, 166 insertions(+), 169 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index c95c7c3f23..7c171f1bf4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.mailbox;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
@@ -100,7 +99,6 @@ public class MailboxService {
* not open the underlying channel or acquire any additional resources.
Instead, it will initialize lazily when the
* data is sent for the first time.
*/
- @VisibleForTesting
public SendingMailbox getSendingMailbox(String hostname, int port, String
mailboxId, long deadlineMs) {
if (_hostname.equals(hostname) && _port == port) {
return new InMemorySendingMailbox(mailboxId, this, deadlineMs);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index edcd0a0fdc..b66c71de48 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -36,9 +36,12 @@ import
org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import
org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -144,34 +147,47 @@ public class QueryRunner {
long deadlineMs = System.currentTimeMillis() + timeoutMs;
// run pre-stage execution for all pipeline breakers
- PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
- _mailboxService, distributedStagePlan, deadlineMs, requestId,
isTraceEnabled);
+ PipelineBreakerResult pipelineBreakerResult =
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan, deadlineMs,
+ requestId, isTraceEnabled);
+
+ // Send error block to all the receivers if pipeline breaker fails
+ if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock()
!= null) {
+ TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+ LOGGER.error("Error executing pipeline breaker for request: {}, stage:
{}, sending error block: {}", requestId,
+ distributedStagePlan.getStageId(),
errorBlock.getDataBlock().getExceptions());
+ int receiverStageId = ((MailboxSendNode)
distributedStagePlan.getStageRoot()).getReceiverStageId();
+ MailboxMetadata mailboxMetadata =
distributedStagePlan.getStageMetadata().getWorkerMetadataList()
+
.get(distributedStagePlan.getServer().workerId()).getMailBoxInfosMap().get(receiverStageId);
+ List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId,
mailboxMetadata);
+ for (int i = 0; i < mailboxIds.size(); i++) {
+ try {
+
_mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(),
+ mailboxMetadata.getVirtualAddress(i).port(), mailboxIds.get(i),
deadlineMs).send(errorBlock);
+ } catch (TimeoutException e) {
+ LOGGER.warn("Timed out sending error block to mailbox: {} for
request: {}, stage: {}", mailboxIds.get(i),
+ requestId, distributedStagePlan.getStageId(), e);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception sending error block to mailbox: {}
for request: {}, stage: {}",
+ mailboxIds.get(i), requestId, distributedStagePlan.getStageId(),
e);
+ }
+ }
+ return;
+ }
// Set Join Overflow configs to StageMetadata from request
setJoinOverflowConfigs(distributedStagePlan, requestMetadataMap);
// run OpChain
+ OpChain opChain;
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
- try {
- OpChain rootOperator = compileLeafStage(requestId,
distributedStagePlan, requestMetadataMap,
- pipelineBreakerResult, deadlineMs, isTraceEnabled);
- _scheduler.register(rootOperator);
- } catch (Exception e) {
- LOGGER.error("Error executing leaf stage for: {}:{}", requestId,
distributedStagePlan.getStageId(), e);
- _scheduler.cancel(requestId);
- throw e;
- }
+ opChain = compileLeafStage(requestId, distributedStagePlan,
requestMetadataMap, pipelineBreakerResult, deadlineMs,
+ isTraceEnabled);
} else {
- try {
- OpChain rootOperator = compileIntermediateStage(requestId,
distributedStagePlan, requestMetadataMap,
- pipelineBreakerResult, deadlineMs, isTraceEnabled);
- _scheduler.register(rootOperator);
- } catch (Exception e) {
- LOGGER.error("Error executing intermediate stage for: {}:{}",
requestId, distributedStagePlan.getStageId(), e);
- _scheduler.cancel(requestId);
- throw e;
- }
+ opChain = compileIntermediateStage(requestId, distributedStagePlan,
requestMetadataMap, pipelineBreakerResult,
+ deadlineMs, isTraceEnabled);
}
+ _scheduler.register(opChain);
}
private void setJoinOverflowConfigs(DistributedStagePlan
distributedStagePlan,
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 6149b758fe..69663f5d33 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -18,13 +18,13 @@
*/
package org.apache.pinot.query.runtime.plan.pipeline;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
@@ -45,11 +45,11 @@ import org.slf4j.LoggerFactory;
* Utility class to run pipeline breaker execution and collects the results.
*/
public class PipelineBreakerExecutor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipelineBreakerExecutor.class);
private PipelineBreakerExecutor() {
- // do not instantiate.
}
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipelineBreakerExecutor.class);
+
/**
* Execute a pipeline breaker and collect the results (synchronously).
Currently, pipeline breaker executor can only
* execute mailbox receive pipeline breaker.
@@ -64,10 +64,10 @@ public class PipelineBreakerExecutor {
* - If exception occurs, exception block will be wrapped in {@link
TransferableBlock} and assigned to each PB node.
* - Normal stats will be attached to each PB node and downstream
execution should return with stats attached.
*/
- public static PipelineBreakerResult executePipelineBreakers(
- OpChainSchedulerService scheduler,
- MailboxService mailboxService, DistributedStagePlan
distributedStagePlan, long deadlineMs,
- long requestId, boolean isTraceEnabled) {
+ @Nullable
+ public static PipelineBreakerResult
executePipelineBreakers(OpChainSchedulerService scheduler,
+ MailboxService mailboxService, DistributedStagePlan
distributedStagePlan, long deadlineMs, long requestId,
+ boolean isTraceEnabled) {
PipelineBreakerContext pipelineBreakerContext = new
PipelineBreakerContext();
PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(),
pipelineBreakerContext);
if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
@@ -76,31 +76,25 @@ public class PipelineBreakerExecutor {
// TODO: This PlanRequestContext needs to indicate it is a pre-stage
opChain and only listens to pre-stage
// OpChain receive-mail callbacks.
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query
information
- OpChainExecutionContext opChainContext = new
OpChainExecutionContext(mailboxService, requestId,
- stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(),
deadlineMs,
- distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
+ OpChainExecutionContext opChainContext =
+ new OpChainExecutionContext(mailboxService, requestId,
stageRoot.getPlanFragmentId(),
+ distributedStagePlan.getServer(), deadlineMs,
distributedStagePlan.getStageMetadata(), null,
+ isTraceEnabled);
PhysicalPlanContext physicalPlanContext = new
PhysicalPlanContext(opChainContext, null);
return PipelineBreakerExecutor.execute(scheduler,
pipelineBreakerContext, physicalPlanContext);
} catch (Exception e) {
- LOGGER.error("Unable to create pipeline breaker results for Req: " +
requestId + ", Stage: "
- + distributedStagePlan.getStageId(), e);
- // Create all error blocks for all pipeline breaker nodes.
- TransferableBlock errorBlock =
TransferableBlockUtils.getErrorTransferableBlock(e);
- Map<Integer, List<TransferableBlock>> resultMap = new HashMap<>();
- for (int key : pipelineBreakerContext.getNodeIdMap().values()) {
- if (pipelineBreakerContext.getPipelineBreakerMap().containsKey(key))
{
- resultMap.put(key, Collections.singletonList(errorBlock));
- }
- }
- return new
PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap, null);
+ LOGGER.error("Caught exception executing pipeline breaker for request:
{}, stage: {}", requestId,
+ distributedStagePlan.getStageId(), e);
+ return new
PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(),
Collections.emptyMap(),
+ TransferableBlockUtils.getErrorTransferableBlock(e), null);
}
} else {
return null;
}
}
- private static PipelineBreakerResult execute(OpChainSchedulerService
scheduler,
- PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext)
+ private static PipelineBreakerResult execute(OpChainSchedulerService
scheduler, PipelineBreakerContext context,
+ PhysicalPlanContext physicalPlanContext)
throws Exception {
Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new
HashMap<>();
for (Map.Entry<Integer, PlanNode> e :
context.getPipelineBreakerMap().entrySet()) {
@@ -119,18 +113,20 @@ public class PipelineBreakerExecutor {
PipelineBreakerContext context, Map<Integer,
Operator<TransferableBlock>> pipelineWorkerMap,
PhysicalPlanContext physicalPlanContext)
throws Exception {
- PipelineBreakerOperator pipelineBreakerOperator = new
PipelineBreakerOperator(
- physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap);
+ PipelineBreakerOperator pipelineBreakerOperator =
+ new
PipelineBreakerOperator(physicalPlanContext.getOpChainExecutionContext(),
pipelineWorkerMap);
CountDownLatch latch = new CountDownLatch(1);
- OpChain pipelineBreakerOpChain = new
OpChain(physicalPlanContext.getOpChainExecutionContext(),
- pipelineBreakerOperator, physicalPlanContext.getReceivingMailboxIds(),
(id) -> latch.countDown());
+ OpChain pipelineBreakerOpChain =
+ new OpChain(physicalPlanContext.getOpChainExecutionContext(),
pipelineBreakerOperator,
+ physicalPlanContext.getReceivingMailboxIds(), (id) ->
latch.countDown());
scheduler.register(pipelineBreakerOpChain);
long timeoutMs = physicalPlanContext.getDeadlineMs() -
System.currentTimeMillis();
if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
return new PipelineBreakerResult(context.getNodeIdMap(),
pipelineBreakerOperator.getResultMap(),
- pipelineBreakerOpChain.getStats());
+ pipelineBreakerOperator.getErrorBlock(),
pipelineBreakerOpChain.getStats());
} else {
- throw new IOException("Exception occur when awaiting breaker results!");
+ throw new TimeoutException(
+ String.format("Timed out waiting for pipeline breaker results after:
%dms", timeoutMs));
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 1aab124135..9fe2588827 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -18,41 +18,34 @@
*/
package org.apache.pinot.query.runtime.plan.pipeline;
-import com.google.common.collect.ImmutableSet;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import javax.annotation.Nullable;
-import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
class PipelineBreakerOperator extends MultiStageOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipelineBreakerOperator.class);
private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
- private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>>
_workerEntries;
- private final Map<Integer, List<TransferableBlock>> _resultMap;
- private final ImmutableSet<Integer> _expectedKeySet;
- private TransferableBlock _finalBlock;
- public PipelineBreakerOperator(OpChainExecutionContext context,
- Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) {
+ private final Map<Integer, Operator<TransferableBlock>> _workerMap;
+
+ private Map<Integer, List<TransferableBlock>> _resultMap;
+ private TransferableBlock _errorBlock;
+
+ public PipelineBreakerOperator(OpChainExecutionContext context, Map<Integer,
Operator<TransferableBlock>> workerMap) {
super(context);
+ _workerMap = workerMap;
_resultMap = new HashMap<>();
- _expectedKeySet = ImmutableSet.copyOf(pipelineWorkerMap.keySet());
- _workerEntries = new ArrayDeque<>();
- _workerEntries.addAll(pipelineWorkerMap.entrySet());
- for (int workerKey : _expectedKeySet) {
+ for (int workerKey : workerMap.keySet()) {
_resultMap.put(workerKey, new ArrayList<>());
}
}
@@ -62,6 +55,10 @@ class PipelineBreakerOperator extends MultiStageOperator {
}
@Nullable
+ public TransferableBlock getErrorBlock() {
+ return _errorBlock;
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
@@ -69,63 +66,44 @@ class PipelineBreakerOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- // Poll from every mailbox operator:
- // - Return the first content block
- // - If no content block found but there are mailboxes not finished, try
again
- // - If all content blocks are already returned, return end-of-stream block
- while (!_workerEntries.isEmpty()) {
- if (_finalBlock != null) {
- return _finalBlock;
- }
- if (System.currentTimeMillis() > _context.getDeadlineMs()) {
- _finalBlock =
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
- constructErrorResponse(_finalBlock);
- return _finalBlock;
- }
-
- Map.Entry<Integer, Operator<TransferableBlock>> worker =
_workerEntries.getLast();
- TransferableBlock block = worker.getValue().nextBlock();
-
- if (block == null) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("==[PB]== Null block on " + _context.getId() + " worker
" + worker.getKey());
+ if (_errorBlock != null) {
+ return _errorBlock;
+ }
+ // NOTE: Put an empty list for each worker in case there is no data block
returned from that worker
+ if (_workerMap.size() == 1) {
+ Map.Entry<Integer, Operator<TransferableBlock>> entry =
_workerMap.entrySet().iterator().next();
+ List<TransferableBlock> dataBlocks = new ArrayList<>();
+ _resultMap = Collections.singletonMap(entry.getKey(), dataBlocks);
+ Operator<TransferableBlock> operator = entry.getValue();
+ TransferableBlock block = operator.nextBlock();
+ while (!block.isSuccessfulEndOfStreamBlock()) {
+ if (block.isErrorBlock()) {
+ _errorBlock = block;
+ return block;
}
- continue;
+ dataBlocks.add(block);
+ block = operator.nextBlock();
}
-
- // Release the mailbox worker when the block is end-of-stream
- if (block.isSuccessfulEndOfStreamBlock()) {
- _workerEntries.removeLast();
- continue;
+ } else {
+ _resultMap = new HashMap<>();
+ for (int workerKey : _workerMap.keySet()) {
+ _resultMap.put(workerKey, new ArrayList<>());
}
-
- if (block.isErrorBlock()) {
- _finalBlock = block;
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("==[PB]== Returned block from : " + _context.getId() + "
block: " + block);
- }
- _resultMap.get(worker.getKey()).add(block);
- return block;
- }
- if (System.currentTimeMillis() > _context.getDeadlineMs()) {
- _finalBlock =
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
- return _finalBlock;
- } else if (_finalBlock == null) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("==[PB]== Finished : " + _context.getId());
+ // Keep polling from every operator in round-robin fashion
+ Queue<Map.Entry<Integer, Operator<TransferableBlock>>> entries = new
ArrayDeque<>(_workerMap.entrySet());
+ while (!entries.isEmpty()) {
+ Map.Entry<Integer, Operator<TransferableBlock>> entry = entries.poll();
+ TransferableBlock block = entry.getValue().nextBlock();
+ if (block.isErrorBlock()) {
+ _errorBlock = block;
+ return block;
+ }
+ if (block.isDataBlock()) {
+ _resultMap.get(entry.getKey()).add(block);
+ entries.offer(entry);
+ }
}
- _finalBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock();
- }
- return _finalBlock;
- }
-
- /**
- * Setting all result map to error if any of the pipeline breaker returns an
ERROR.
- */
- private void constructErrorResponse(TransferableBlock errorBlock) {
- for (int key : _expectedKeySet) {
- _resultMap.put(key, Collections.singletonList(errorBlock));
}
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
index fe0f2cba61..2e2b003e34 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -32,12 +32,14 @@ import org.apache.pinot.query.runtime.operator.OpChainStats;
public class PipelineBreakerResult {
private final Map<PlanNode, Integer> _nodeIdMap;
private final Map<Integer, List<TransferableBlock>> _resultMap;
+ private final TransferableBlock _errorBlock;
private final OpChainStats _opChainStats;
public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer,
List<TransferableBlock>> resultMap,
- OpChainStats opChainStats) {
+ @Nullable TransferableBlock errorBlock, @Nullable OpChainStats
opChainStats) {
_nodeIdMap = nodeIdMap;
_resultMap = resultMap;
+ _errorBlock = errorBlock;
_opChainStats = opChainStats;
}
@@ -49,6 +51,11 @@ public class PipelineBreakerResult {
return _resultMap;
}
+ @Nullable
+ public TransferableBlock getErrorBlock() {
+ return _errorBlock;
+ }
+
@Nullable
public OpChainStats getOpChainStats() {
return _opChainStats;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 0d94a4d099..50ed9f7cf5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.service.dispatch;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.HashMap;
@@ -211,8 +212,11 @@ public class QueryDispatcher {
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(scheduler,
mailboxService, reducerStagePlan,
System.currentTimeMillis() + timeoutMs, requestId, traceEnabled);
- if (pipelineBreakerResult == null) {
- throw new RuntimeException("Broker reducer error during query
execution!");
+ Preconditions.checkState(pipelineBreakerResult != null, "Pipeline breaker
result should not be null");
+ if (pipelineBreakerResult.getErrorBlock() != null) {
+ throw new RuntimeException(
+ "Received error query execution result block: " +
pipelineBreakerResult.getErrorBlock().getDataBlock()
+ .getExceptions());
}
collectStats(dispatchableSubPlan, pipelineBreakerResult.getOpChainStats(),
statsAggregatorMap);
List<TransferableBlock> resultDataBlocks =
pipelineBreakerResult.getResultMap().get(0);
@@ -245,10 +249,6 @@ public class QueryDispatcher {
List<Object[]> resultRows = new ArrayList<>();
DataSchema resultSchema = toResultSchema(sourceSchema, fields);
for (TransferableBlock transferableBlock : queryResult) {
- if (transferableBlock.isErrorBlock()) {
- throw new RuntimeException(
- "Received error query execution result block: " +
transferableBlock.getDataBlock().getExceptions());
- }
DataBlock dataBlock = transferableBlock.getDataBlock();
int numColumns = resultSchema.getColumnNames().length;
int numRows = dataBlock.getNumberOfRows();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 7ee9ed8732..3c73434c15 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -19,14 +19,15 @@
package org.apache.pinot.query.runtime;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -44,6 +45,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -188,12 +191,18 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
*/
@Test(dataProvider = "testDataWithSqlExecutionExceptions")
public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
- long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
- DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
- Map<String, String> requestMetadataMap =
- ImmutableMap.of(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
String.valueOf(requestId),
- CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
- String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+ long requestId = REQUEST_ID_GEN.getAndIncrement();
+ SqlNodeAndOptions sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(sql);
+ QueryEnvironment.QueryPlannerResult queryPlannerResult =
+ _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
+ DispatchableSubPlan dispatchableSubPlan =
queryPlannerResult.getQueryPlan();
+ Map<String, String> requestMetadataMap = new HashMap<>();
+
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
String.valueOf(requestId));
+ Long timeoutMs =
QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
+
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+ String.valueOf(timeoutMs != null ? timeoutMs :
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"true");
+ requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
int reducerStageId = -1;
for (int stageId = 0; stageId <
dispatchableSubPlan.getQueryStageList().size(); stageId++) {
if
(dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment()
@@ -210,17 +219,16 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)),
_mailboxService,
_reducerScheduler, null, false);
- } catch (RuntimeException rte) {
- Assert.assertTrue(rte.getMessage().contains("Received error query
execution result block"));
- // TODO: The actual message is (usually) something like:
- // Received error query execution result block:
{200=QueryExecutionError:
- // java.lang.IllegalArgumentException: Illegal Json Path: $['path']
does not match document
- // at
org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.throwPathNotFoundException(...)
- // at
org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.processValue(...)
- // at
org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.evaluateBlock(...)
- // at
org.apache.pinot.core.common.DataFetcher$ColumnValueReader.readIntValues(DataFetcher.java:489)}
- Assert.assertTrue(rte.getMessage().contains(exceptionMsg), "Exception
should contain: " + exceptionMsg
- + "! but found: " + rte.getMessage());
+ Assert.fail("Should have thrown exception!");
+ } catch (RuntimeException e) {
+ // NOTE: The actual message is (usually) something like:
+ // Received error query execution result block:
{200=QueryExecutionError:
+ // Query execution error on: Server_localhost_12345
+ // java.lang.IllegalArgumentException: Illegal Json Path: $['path']
does not match document
+ String exceptionMessage = e.getMessage();
+ Assert.assertTrue(exceptionMessage.startsWith("Received error query
execution result block: "));
+ Assert.assertTrue(exceptionMessage.contains(exceptionMsg),
+ "Exception should contain: " + exceptionMsg + ", but found: " +
exceptionMessage);
}
}
@@ -288,7 +296,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
// Timeout exception should occur with this option:
new Object[]{
"SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN
c ON a.col1 = c.col1",
- "timeout"
+ "Timeout"
},
// Function with incorrect argument signature should throw runtime
exception when casting string to numeric
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index d8589c42a0..7dcf8bb759 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -37,14 +37,15 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryServerEnclosure;
@@ -87,7 +88,7 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
protected static final String SEGMENT_BREAKER_KEY =
"__SEGMENT_BREAKER_KEY__";
protected static final String SEGMENT_BREAKER_STR = "------";
protected static final GenericRow SEGMENT_BREAKER_ROW = new GenericRow();
- protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
+ protected static final AtomicLong REQUEST_ID_GEN = new AtomicLong();
protected QueryEnvironment _queryEnvironment;
protected String _reducerHostname;
protected int _reducerGrpcPort;
@@ -108,15 +109,16 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
* ser/de dispatches.
*/
protected List<Object[]> queryRunner(String sql, Map<Integer,
ExecutionStatsAggregator> executionStatsAggregatorMap) {
- long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+ long requestId = REQUEST_ID_GEN.getAndIncrement();
SqlNodeAndOptions sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(sql);
QueryEnvironment.QueryPlannerResult queryPlannerResult =
_queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
DispatchableSubPlan dispatchableSubPlan =
queryPlannerResult.getQueryPlan();
Map<String, String> requestMetadataMap = new HashMap<>();
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
String.valueOf(requestId));
+ Long timeoutMs =
QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
- String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+ String.valueOf(timeoutMs != null ? timeoutMs :
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"true");
requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 7f33dc45a4..b5ba82e48d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -136,6 +136,7 @@ public class PipelineBreakerExecutorTest {
// then
// should have single PB result, receive 2 data blocks, EOS block
shouldn't be included
Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertNull(pipelineBreakerResult.getErrorBlock());
Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(),
2);
@@ -176,6 +177,7 @@ public class PipelineBreakerExecutorTest {
// then
// should have two PB result, receive 2 data blocks, one each, EOS block
shouldn't be included
Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertNull(pipelineBreakerResult.getErrorBlock());
Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
Iterator<List<TransferableBlock>> it =
pipelineBreakerResult.getResultMap().values().iterator();
Assert.assertEquals(it.next().size(), 1);
@@ -201,8 +203,9 @@ public class PipelineBreakerExecutorTest {
System.currentTimeMillis() + 10_000L, 0, false);
// then
- // should contain only failure error blocks
+ // should return empty block list
Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertNull(pipelineBreakerResult.getErrorBlock());
Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
List<TransferableBlock> resultBlocks =
pipelineBreakerResult.getResultMap().values().iterator().next();
Assert.assertEquals(resultBlocks.size(), 0);
@@ -233,11 +236,9 @@ public class PipelineBreakerExecutorTest {
// then
// should contain only failure error blocks
Assert.assertNotNull(pipelineBreakerResult);
- Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
- List<TransferableBlock> resultBlocks =
pipelineBreakerResult.getResultMap().values().iterator().next();
- Assert.assertEquals(resultBlocks.size(), 1);
- Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
- Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+ TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+ Assert.assertNotNull(errorBlock);
+ Assert.assertTrue(errorBlock.isErrorBlock());
}
@Test
@@ -311,17 +312,8 @@ public class PipelineBreakerExecutorTest {
// then
// should fail even if one of the 2 PB doesn't contain error block from
sender.
Assert.assertNotNull(pipelineBreakerResult);
- Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
-
- boolean errorFound = false;
- for (List<TransferableBlock> resultBlocks :
pipelineBreakerResult.getResultMap().values()) {
- if (!resultBlocks.isEmpty()) {
- TransferableBlock lastBlock = resultBlocks.get(resultBlocks.size() -
1);
- if (lastBlock.isErrorBlock()) {
- errorFound = true;
- }
- }
- }
- Assert.assertTrue(errorFound, "An error block should be the last block on
at least one of the result map entries");
+ TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+ Assert.assertNotNull(errorBlock);
+ Assert.assertTrue(errorBlock.isErrorBlock());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]