This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 43859a1f7e IGNITE-17998 Sql. Close the cursor synchronously when the 
session is closed (#1330)
43859a1f7e is described below

commit 43859a1f7ef8ef761ca3da8587a3d5c7b073b841
Author: Pavel Pereslegin <xxt...@gmail.com>
AuthorDate: Wed Nov 16 13:11:31 2022 +0300

    IGNITE-17998 Sql. Close the cursor synchronously when the session is closed 
(#1330)
---
 .../ignite/internal/sql/api/ItCommonApiTest.java   |  9 +++--
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  5 ++-
 .../sql/engine/exec/ExecutionServiceImpl.java      | 39 ++++++++++++++--------
 3 files changed, 32 insertions(+), 21 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 4ede4c48c4..6b46dc5b76 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.api;
 
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.lang.ErrorGroups.Sql;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
-import org.apache.ignite.sql.CursorClosedException;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
@@ -78,16 +79,14 @@ public class ItCommonApiTest extends 
AbstractBasicIntegrationTest {
         assertEquals(Sql.SESSION_NOT_FOUND_ERR, ex.code());
 
         // already started query should fail due to session has been expired
-        ex = assertThrows(CursorClosedException.class, () -> {
+        assertThrowsWithCause(() -> {
             while (rs1.hasNext()) {
                 rs1.next();
             }
-        });
+        }, ExecutionCancelledException.class);
 
         rs1.close();
 
-        assertEquals(Sql.CURSOR_CLOSED_ERR, ex.code());
-
         // second session could proceed with execution
         while (rs2.hasNext()) {
             rs2.next();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 6b651caef0..418d8941a9 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
 import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
@@ -76,7 +77,6 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
@@ -577,7 +577,6 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
         }
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17998";)
     @Test
     public void closeSession() throws ExecutionException, InterruptedException 
{
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
@@ -597,7 +596,7 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
 
         assertThrowsWithCause(
                 () -> ars0.fetchNextPage().toCompletableFuture().get(),
-                SqlException.class
+                ExecutionCancelledException.class
         );
 
         assertThrowsWithCause(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index e1cd92462c..81c70e51f4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -197,7 +197,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         assert old == null;
 
-        ctx.cancel().add(() -> queryManager.close(false));
+        ctx.cancel().add(() -> queryManager.close(true));
 
         return queryManager.execute(plan);
     }
@@ -622,20 +622,9 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 return cancelFut.thenApply(Function.identity());
             }
 
-            CompletableFuture<Void> start = new CompletableFuture<>();
+            CompletableFuture<Void> start = closeExecNode(cancel);
 
             start
-                    .thenCompose(none -> {
-                        if (!root.completeExceptionally(new 
ExecutionCancelledException()) && !root.isCompletedExceptionally()) {
-                            if (cancel) {
-                                return root.thenAccept(root -> 
root.onError(new ExecutionCancelledException()));
-                            }
-
-                            return root.thenCompose(AsyncRootNode::closeAsync);
-                        }
-
-                        return CompletableFuture.completedFuture(null);
-                    })
                     .thenCompose(tmp -> {
                         Map<String, List<CompletableFuture<?>>> 
requestsPerNode = new HashMap<>();
                         for (Map.Entry<RemoteFragmentKey, 
CompletableFuture<Void>> entry : remoteFragmentInitCompletion.entrySet()) {
@@ -693,6 +682,30 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
             return cancelFut.thenApply(Function.identity());
         }
+
+        /**
+         * Synchronously closes the tree's execution iterator.
+         *
+         * @param cancel Forces execution to terminate with {@link 
ExecutionCancelledException}.
+         * @return Completable future that should run asynchronously.
+         */
+        private CompletableFuture<Void> closeExecNode(boolean cancel) {
+            CompletableFuture<Void> fut = new CompletableFuture<>();
+
+            if (!root.completeExceptionally(new ExecutionCancelledException()) 
&& !root.isCompletedExceptionally()) {
+                AsyncRootNode<RowT, List<Object>> node = root.getNow(null);
+
+                if (!cancel) {
+                    CompletableFuture<Void> closeFut = node.closeAsync();
+
+                    return fut.thenCompose(v -> closeFut);
+                }
+
+                node.onError(new ExecutionCancelledException());
+            }
+
+            return fut;
+        }
     }
 
     @FunctionalInterface

Reply via email to