This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 9fcd100d82 IGNITE-20385 Sql. Fixed handling of NodeLeftException (#2622) 9fcd100d82 is described below commit 9fcd100d82347d68b4e832da1a92e3a1d9b658a3 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Tue Oct 17 14:55:37 2023 +0300 IGNITE-20385 Sql. Fixed handling of NodeLeftException (#2622) --- .../ignite/internal/util/ExceptionUtils.java | 26 ++++++++++------------ .../internal/sql/engine/NodeLeftException.java | 11 +++++++++ .../sql/engine/exec/ExecutionServiceImpl.java | 4 ++-- .../ignite/internal/sql/engine/exec/rel/Inbox.java | 2 +- .../internal/sql/engine/exec/rel/Outbox.java | 4 ++-- .../sql/engine/exec/ExecutionServiceImplTest.java | 16 ++++++++----- 6 files changed, 39 insertions(+), 24 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java index 4a2aae2ebe..4182f17f7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java @@ -364,8 +364,8 @@ public final class ExceptionUtils { /** * Creates a new exception, which type is defined by the provided {@code supplier}, with the specified {@code t} as a cause. - * In the case when the provided cause {@code t} is an instance of {@link IgniteInternalException} - * or {@link IgniteInternalCheckedException}, the original trace identifier and full error code are preserved. + * In the case when the provided cause {@code t} is an instance of {@link TraceableException}, + * the original trace identifier and full error code are preserved. * Otherwise, a newly generated trace identifier and {@code defaultCode} are used. * * @param supplier Reference to a exception constructor. @@ -380,8 +380,8 @@ public final class ExceptionUtils { /** * Creates a new exception, which type is defined by the provided {@code supplier}, with the specified {@code t} as a cause. - * In the case when the provided cause {@code t} is an instance of {@link IgniteInternalException} - * or {@link IgniteInternalCheckedException}, the original trace identifier and full error code are preserved. + * In the case when the provided cause {@code t} is an instance of {@link TraceableException}, + * the original trace identifier and full error code are preserved. * Otherwise, a newly generated trace identifier and {@code defaultCode} are used. * * @param supplier Reference to a exception constructor. @@ -403,8 +403,8 @@ public final class ExceptionUtils { /** * Creates a new exception, which type is defined by the provided {@code supplier}, with the specified {@code t} as a cause * and full error code {@code code}. - * In the case when the provided cause {@code t} is an instance of {@link IgniteInternalException} - * or {@link IgniteInternalCheckedException}, the original trace identifier preserved. + * In the case when the provided cause {@code t} is an instance of {@link TraceableException}, + * the original trace identifier preserved. * Otherwise, a newly generated trace identifier is used. * * @param supplier Reference to a exception constructor. @@ -420,8 +420,8 @@ public final class ExceptionUtils { /** * Creates a new exception, which type is defined by the provided {@code supplier}, with the specified {@code t} as a cause, * full error code {@code code} and error message {@code message}. - * In the case when the provided cause {@code t} is an instance of {@link IgniteInternalException} - * or {@link IgniteInternalCheckedException}, the original trace identifier preserved. + * In the case when the provided cause {@code t} is an instance of {@link TraceableException}, + * the original trace identifier preserved. * Otherwise, a newly generated trace identifier is used. * * @param supplier Reference to a exception constructor. @@ -456,12 +456,10 @@ public final class ExceptionUtils { ) { Throwable unwrapped = unwrapCause(t); - if (unwrapped instanceof IgniteInternalException) { - IgniteInternalException iie = (IgniteInternalException) unwrapped; - return supplier.apply(iie.traceId(), iie.code(), iie.getMessage(), t); - } else if (unwrapped instanceof IgniteInternalCheckedException) { - IgniteInternalCheckedException iice = (IgniteInternalCheckedException) unwrapped; - return supplier.apply(iice.traceId(), iice.code(), iice.getMessage(), t); + if (unwrapped instanceof TraceableException) { + TraceableException traceable = (TraceableException) unwrapped; + + return supplier.apply(traceable.traceId(), traceable.code(), unwrapped.getMessage(), t); } return supplier.apply(UUID.randomUUID(), defaultCode, t.getMessage(), t); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/NodeLeftException.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/NodeLeftException.java index a7c8056f5c..fcccbb3eef 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/NodeLeftException.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/NodeLeftException.java @@ -19,7 +19,9 @@ package org.apache.ignite.internal.sql.engine; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR; +import java.util.UUID; import org.apache.ignite.lang.IgniteException; +import org.jetbrains.annotations.Nullable; /** * The exception is thrown when SQL engine can not process an operation because a node has a left cluster. @@ -32,4 +34,13 @@ public class NodeLeftException extends IgniteException { public NodeLeftException(String nodeName) { super(NODE_LEFT_ERR, "Node left the cluster. Node: " + nodeName); } + + /** + * Constructor is used to copy the original exception with an extended error description. + */ + public NodeLeftException(UUID traceId, int code, String message, @Nullable Throwable cause) { + super(traceId, code, message, cause); + + assert code == NODE_LEFT_ERR; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 0b0984ed7c..aa4c88cb73 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -704,8 +704,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve completionFuture.complete(null); } - throw ExceptionUtils.withCauseAndCode( - IgniteInternalException::new, + throw ExceptionUtils.withCause( + t instanceof NodeLeftException ? NodeLeftException::new : IgniteInternalException::new, INTERNAL_ERR, format("Unable to send fragment [targetNode={}, fragmentId={}, cause={}]", nodeName, fragment.fragmentId(), t.getMessage()), t diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java index 1b45f5578c..ba70a02e78 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java @@ -349,7 +349,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, Si exchange.request(nodeName, queryId(), srcFragmentId, exchangeId, cnt, state) .whenComplete((ignored, ex) -> { if (ex != null) { - IgniteInternalException wrapperEx = ExceptionUtils.withCauseAndCode( + IgniteInternalException wrapperEx = ExceptionUtils.withCause( IgniteInternalException::new, Common.INTERNAL_ERR, "Unable to request next batch: " + ex.getMessage(), diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java index 39758e7a70..fb6560c60f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java @@ -248,7 +248,7 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S return; } - IgniteInternalException wrapperEx = ExceptionUtils.withCauseAndCode( + IgniteInternalException wrapperEx = ExceptionUtils.withCause( IgniteInternalException::new, Common.INTERNAL_ERR, "Unable to send batch: " + ex.getMessage(), @@ -270,7 +270,7 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S return; } - IgniteInternalException wrapperEx = ExceptionUtils.withCauseAndCode( + IgniteInternalException wrapperEx = ExceptionUtils.withCause( IgniteInternalException::new, Common.INTERNAL_ERR, "Unable to send error: " + ex.getMessage(), diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index f220ad7c91..5ade4b6ffd 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -22,16 +22,18 @@ import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; +import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -52,6 +54,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -62,6 +65,7 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.metrics.MetricManager; +import org.apache.ignite.internal.sql.engine.NodeLeftException; import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.QueryCancelledException; import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode; @@ -549,8 +553,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { return CompletableFuture.completedFuture(null); } else { // On other nodes, simulate that the node has already gone. - return CompletableFuture.failedFuture(new IgniteInternalException(Common.INTERNAL_ERR, - "Connection refused to " + node.nodeName + ", message " + msg)); + return CompletableFuture.failedFuture(new NodeLeftException(node.nodeName)); } })); @@ -558,7 +561,10 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { AsyncCursor<List<Object>> cursor = execService.executePlan(tx, plan, ctx); // Wait till the query fails due to nodes' unavailability. - assertThat(cursor.closeAsync(), willThrow(hasProperty("message", containsString("Unable to send fragment")), 10, TimeUnit.SECONDS)); + ExecutionException eex = assertThrows(ExecutionException.class, () -> cursor.closeAsync().get(10, TimeUnit.SECONDS)); + assertThat(eex.getCause(), instanceOf(NodeLeftException.class)); + assertThat(eex.getCause().getMessage(), containsString("cause=Node left the cluster")); + assertThat(((NodeLeftException) eex.getCause()).code(), equalTo(NODE_LEFT_ERR)); // Let the root fragment be executed. queryFailedLatch.countDown();