This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eb4d96c2c55 IGNITE-28483 Sql. Simplify close method in
DistributedQueryManager (#7951)
eb4d96c2c55 is described below
commit eb4d96c2c55ab2e78bd8d6d63f0c14ff0571c566
Author: korlov42 <[email protected]>
AuthorDate: Wed Apr 8 15:31:35 2026 +0300
IGNITE-28483 Sql. Simplify close method in DistributedQueryManager (#7951)
---
.../sql/engine/exec/ExecutionServiceImpl.java | 24 +++++++------
.../sql/engine/exec/ExecutionServiceImplTest.java | 42 ----------------------
2 files changed, 13 insertions(+), 53 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 23d413eab70..fc39fb819e2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -1143,7 +1143,17 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
} catch (Exception e) {
LOG.info("Unable to send error message", e);
- close(CancellationReason.CANCEL);
+ try {
+ messageService.send(
+ initiatorNode,
+ FACTORY.queryCloseMessage()
+ .queryId(executionId.queryId())
+
.executionToken(executionId.executionToken())
+ .build()
+ );
+ } finally {
+ close(CancellationReason.CANCEL);
+ }
}
}
@@ -1327,19 +1337,13 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
return cancelFut;
}
- CompletableFuture<Void> start = new CompletableFuture<>();
-
CompletableFuture<Void> stage;
if (coordinator) {
- stage = start.thenCompose(ignored -> closeRootNode(reason))
+ stage = closeRootNode(reason)
.thenCompose(ignored ->
awaitFragmentInitialisationAndClose());
} else {
- stage = start.thenCompose(ignored ->
messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
- .queryId(executionId.queryId())
- .executionToken(executionId.executionToken())
- .build()).exceptionally(ignore -> null))
- .thenCompose(ignored -> closeLocalFragments());
+ stage = closeLocalFragments();
}
stage.whenComplete((r, e) -> {
@@ -1354,8 +1358,6 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
cancelFut.complete(null);
}).thenRun(() -> localFragments.forEach(f ->
f.context().cancel()));
- start.completeAsync(() -> null, taskExecutor);
-
return cancelFut;
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 750d3e4e96f..95d622c0b22 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -1235,48 +1235,6 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertThat(txCtx.observableTime(),
equalTo(expectedCatalogActivationTimestamp));
}
- @Test
- public void coordinatorIgnoresRemoteCloseErrorFromNodeOnCoordinator()
throws InterruptedException {
- ExecutionService execService = executionServices.get(0);
-
- nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
-
- var expectedEx = new RuntimeException("Test error");
- var queryClosed = new CountDownLatch(nodeNames.size() - 1);
-
- String coordinatorNode = nodeNames.get(0);
- testCluster.node(coordinatorNode).interceptor((senderNode, msg,
original) -> {
- if (msg instanceof QueryStartRequest) {
- QueryStartRequest queryStart = (QueryStartRequest) msg;
-
- String nodeName = senderNode.name();
-
testCluster.node(coordinatorNode).messageService().send(nodeName, new
SqlQueryMessagesFactory().queryStartResponse()
- .queryId(queryStart.queryId())
- .fragmentId(queryStart.fragmentId())
- .error(expectedEx)
- .build()
- );
- } else {
- original.onMessage(senderNode, msg);
- }
-
- if (msg instanceof QueryCloseMessage) {
- queryClosed.countDown();
- return CompletableFuture.failedFuture(new
RuntimeException("Test exception: failed to close"));
- } else {
- return nullCompletedFuture();
- }
- });
-
- SqlOperationContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
-
- RuntimeException actualException =
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
- assertEquals(expectedEx, actualException);
-
- queryClosed.await();
- }
-
@Test
public void coordinatorIgnoresRemoteCloseErrorOnNode() throws
InterruptedException {
ExecutionService execService = executionServices.get(0);