This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2ba230c39a Handle errors in combine operator (#9689)
2ba230c39a is described below
commit 2ba230c39aedb2d88958f2001b859692d31b10a5
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Oct 31 11:05:20 2022 -0700
Handle errors in combine operator (#9689)
---
.../pinot/common/exception/QueryException.java | 8 +-
.../blocks/results/ExceptionResultsBlock.java | 8 +-
.../core/operator/combine/BaseCombineOperator.java | 21 ++-
.../operator/combine/GroupByCombineOperator.java | 4 +-
.../combine/CombineErrorOperatorsTest.java | 197 +++++++++++++++++++++
5 files changed, 221 insertions(+), 17 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 0009cee884..a4176f65fe 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -163,8 +163,8 @@ public class QueryException {
QUOTA_EXCEEDED_ERROR.setMessage("QuotaExceededError");
}
- public static ProcessingException getException(ProcessingException
processingException, Exception exception) {
- return getException(processingException,
getTruncatedStackTrace(exception));
+ public static ProcessingException getException(ProcessingException
processingException, Throwable t) {
+ return getException(processingException, getTruncatedStackTrace(t));
}
public static ProcessingException getException(ProcessingException
processingException, String errorMessage) {
@@ -174,9 +174,9 @@ public class QueryException {
return copiedProcessingException;
}
- public static String getTruncatedStackTrace(Throwable exception) {
+ public static String getTruncatedStackTrace(Throwable t) {
StringWriter stringWriter = new StringWriter();
- exception.printStackTrace(new PrintWriter(stringWriter));
+ t.printStackTrace(new PrintWriter(stringWriter));
String fullStackTrace = stringWriter.toString();
String[] lines = StringUtils.split(fullStackTrace, '\n');
// exception should at least have one line, no need to check here.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index d8df97ee7c..4f7f6916d1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -30,12 +30,12 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
public class ExceptionResultsBlock extends BaseResultsBlock {
- public ExceptionResultsBlock(ProcessingException processingException,
Exception e) {
- addToProcessingExceptions(QueryException.getException(processingException,
e));
+ public ExceptionResultsBlock(ProcessingException processingException,
Throwable t) {
+ addToProcessingExceptions(QueryException.getException(processingException,
t));
}
- public ExceptionResultsBlock(Exception e) {
- this(QueryException.QUERY_EXECUTION_ERROR, e);
+ public ExceptionResultsBlock(Throwable t) {
+ this(QueryException.QUERY_EXECUTION_ERROR, t);
}
@Nullable
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index b99b12d334..364de03fda 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -105,10 +105,17 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
processSegments();
} catch (EarlyTerminationException e) {
// Early-terminated by interruption (canceled by the main thread)
- } catch (Exception e) {
- // Caught exception, skip processing the remaining segments
- LOGGER.error("Caught exception while processing query: " +
_queryContext, e);
- onException(e);
+ } catch (Throwable t) {
+ // Caught exception/error, skip processing the remaining segments
+ // NOTE: We need to handle Error here, or the execution threads
will die without adding the execution
+ // exception into the query response, and the main thread
might wait infinitely (until timeout) or
+ // throw unexpected exceptions (such as NPE).
+ if (t instanceof Exception) {
+ LOGGER.error("Caught exception while processing query: " +
_queryContext, t);
+ } else {
+ LOGGER.error("Caught serious error while processing query: " +
_queryContext, t);
+ }
+ onException(t);
} finally {
onFinish();
phaser.arriveAndDeregister();
@@ -180,10 +187,10 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
}
/**
- * Invoked when {@link #processSegments()} throws exception.
+ * Invoked when {@link #processSegments()} throws exception/error.
*/
- protected void onException(Exception e) {
- _blockingQueue.offer(new ExceptionResultsBlock(e));
+ protected void onException(Throwable t) {
+ _blockingQueue.offer(new ExceptionResultsBlock(t));
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 57bf20f50e..5108d4d964 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -216,8 +216,8 @@ public class GroupByCombineOperator extends
BaseCombineOperator<GroupByResultsBl
}
@Override
- protected void onException(Exception e) {
-
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
+ protected void onException(Throwable t) {
+
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
t));
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
new file mode 100644
index 0000000000..da8cb4835b
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Block;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+@SuppressWarnings("rawtypes")
+public class CombineErrorOperatorsTest {
+ private static final int NUM_OPERATORS = 10;
+ private static final int NUM_THREADS = 2;
+ private static final long TIMEOUT_MS = 1000L;
+
+ private ExecutorService _executorService;
+
+ @BeforeClass
+ public void setUp() {
+ _executorService = Executors.newFixedThreadPool(NUM_THREADS);
+ }
+
+ @Test
+ public void testCombineExceptionOperator() {
+ List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+ for (int i = 0; i < NUM_OPERATORS - 1; i++) {
+ operators.add(new RegularOperator());
+ }
+ operators.add(new ExceptionOperator());
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+ queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
+ SelectionOnlyCombineOperator combineOperator =
+ new SelectionOnlyCombineOperator(operators, queryContext,
_executorService);
+ BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+ assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+ List<ProcessingException> processingExceptions =
resultsBlock.getProcessingExceptions();
+ assertNotNull(processingExceptions);
+ assertEquals(processingExceptions.size(), 1);
+ ProcessingException processingException = processingExceptions.get(0);
+ assertEquals(processingException.getErrorCode(),
QueryException.QUERY_EXECUTION_ERROR_CODE);
+
assertTrue(processingException.getMessage().contains("java.lang.RuntimeException:
Exception"));
+ }
+
+ @Test
+ public void testCombineErrorOperator() {
+ List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+ for (int i = 0; i < NUM_OPERATORS - 1; i++) {
+ operators.add(new RegularOperator());
+ }
+ operators.add(new ErrorOperator());
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+ queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
+ SelectionOnlyCombineOperator combineOperator =
+ new SelectionOnlyCombineOperator(operators, queryContext,
_executorService);
+ BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+ assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+ List<ProcessingException> processingExceptions =
resultsBlock.getProcessingExceptions();
+ assertNotNull(processingExceptions);
+ assertEquals(processingExceptions.size(), 1);
+ ProcessingException processingException = processingExceptions.get(0);
+ assertEquals(processingException.getErrorCode(),
QueryException.QUERY_EXECUTION_ERROR_CODE);
+ assertTrue(processingException.getMessage().contains("java.lang.Error:
Error"));
+ }
+
+ @Test
+ public void testCombineExceptionAndErrorOperator() {
+ List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+ for (int i = 0; i < NUM_OPERATORS - 2; i++) {
+ operators.add(new RegularOperator());
+ }
+ operators.add(new ExceptionOperator());
+ operators.add(new ErrorOperator());
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+ queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
+ SelectionOnlyCombineOperator combineOperator =
+ new SelectionOnlyCombineOperator(operators, queryContext,
_executorService);
+ BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+ assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+ List<ProcessingException> processingExceptions =
resultsBlock.getProcessingExceptions();
+ assertNotNull(processingExceptions);
+ assertEquals(processingExceptions.size(), 1);
+ ProcessingException processingException = processingExceptions.get(0);
+ assertEquals(processingException.getErrorCode(),
QueryException.QUERY_EXECUTION_ERROR_CODE);
+ String message = processingException.getMessage();
+ assertTrue(message.contains("java.lang.RuntimeException: Exception") ||
message.contains("java.lang.Error: Error"));
+ }
+
+ private static class ExceptionOperator extends BaseOperator {
+ private static final String EXPLAIN_NAME = "EXCEPTION";
+
+ @Override
+ protected Block getNextBlock() {
+ throw new RuntimeException("Exception");
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ return new ExecutionStatistics(0, 0, 0, 0);
+ }
+ }
+
+ private static class ErrorOperator extends BaseOperator {
+ private static final String EXPLAIN_NAME = "ERROR";
+
+ @Override
+ protected Block getNextBlock() {
+ throw new Error("Error");
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ return new ExecutionStatistics(0, 0, 0, 0);
+ }
+ }
+
+ private static class RegularOperator extends BaseOperator {
+ private static final String EXPLAIN_NAME = "REGULAR";
+
+ @Override
+ protected Block getNextBlock() {
+ return new SelectionResultsBlock(
+ new DataSchema(new String[]{"myColumn"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}),
+ new ArrayList<>());
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ return new ExecutionStatistics(0, 0, 0, 0);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]