This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eb4d96c2c55 IGNITE-28483 Sql. Simplify close method in 
DistributedQueryManager (#7951)
eb4d96c2c55 is described below

commit eb4d96c2c55ab2e78bd8d6d63f0c14ff0571c566
Author: korlov42 <[email protected]>
AuthorDate: Wed Apr 8 15:31:35 2026 +0300

    IGNITE-28483 Sql. Simplify close method in DistributedQueryManager (#7951)
---
 .../sql/engine/exec/ExecutionServiceImpl.java      | 24 +++++++------
 .../sql/engine/exec/ExecutionServiceImplTest.java  | 42 ----------------------
 2 files changed, 13 insertions(+), 53 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 23d413eab70..fc39fb819e2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -1143,7 +1143,17 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
             } catch (Exception e) {
                 LOG.info("Unable to send error message", e);
 
-                close(CancellationReason.CANCEL);
+                try {
+                    messageService.send(
+                            initiatorNode,
+                            FACTORY.queryCloseMessage()
+                                    .queryId(executionId.queryId())
+                                    
.executionToken(executionId.executionToken())
+                                    .build()
+                    );
+                } finally {
+                    close(CancellationReason.CANCEL);
+                }
             }
         }
 
@@ -1327,19 +1337,13 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
                 return cancelFut;
             }
 
-            CompletableFuture<Void> start = new CompletableFuture<>();
-
             CompletableFuture<Void> stage;
 
             if (coordinator) {
-                stage = start.thenCompose(ignored -> closeRootNode(reason))
+                stage = closeRootNode(reason)
                         .thenCompose(ignored -> 
awaitFragmentInitialisationAndClose());
             } else {
-                stage = start.thenCompose(ignored -> 
messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
-                                .queryId(executionId.queryId())
-                                .executionToken(executionId.executionToken())
-                                .build()).exceptionally(ignore -> null))
-                        .thenCompose(ignored -> closeLocalFragments());
+                stage = closeLocalFragments();
             }
 
             stage.whenComplete((r, e) -> {
@@ -1354,8 +1358,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
                 cancelFut.complete(null);
             }).thenRun(() -> localFragments.forEach(f -> 
f.context().cancel()));
 
-            start.completeAsync(() -> null, taskExecutor);
-
             return cancelFut;
         }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 750d3e4e96f..95d622c0b22 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -1235,48 +1235,6 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         assertThat(txCtx.observableTime(), 
equalTo(expectedCatalogActivationTimestamp));
     }
 
-    @Test
-    public void coordinatorIgnoresRemoteCloseErrorFromNodeOnCoordinator() 
throws InterruptedException {
-        ExecutionService execService = executionServices.get(0);
-
-        nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
-
-        var expectedEx = new RuntimeException("Test error");
-        var queryClosed = new CountDownLatch(nodeNames.size() - 1);
-
-        String coordinatorNode = nodeNames.get(0);
-        testCluster.node(coordinatorNode).interceptor((senderNode, msg, 
original) -> {
-            if (msg instanceof QueryStartRequest) {
-                QueryStartRequest queryStart = (QueryStartRequest) msg;
-
-                String nodeName = senderNode.name();
-                
testCluster.node(coordinatorNode).messageService().send(nodeName, new 
SqlQueryMessagesFactory().queryStartResponse()
-                        .queryId(queryStart.queryId())
-                        .fragmentId(queryStart.fragmentId())
-                        .error(expectedEx)
-                        .build()
-                );
-            } else {
-                original.onMessage(senderNode, msg);
-            }
-
-            if (msg instanceof QueryCloseMessage) {
-                queryClosed.countDown();
-                return CompletableFuture.failedFuture(new 
RuntimeException("Test exception: failed to close"));
-            } else {
-                return nullCompletedFuture();
-            }
-        });
-
-        SqlOperationContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
-
-        RuntimeException actualException = 
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
-        assertEquals(expectedEx, actualException);
-
-        queryClosed.await();
-    }
-
     @Test
     public void coordinatorIgnoresRemoteCloseErrorOnNode() throws 
InterruptedException {
         ExecutionService execService = executionServices.get(0);

Reply via email to