This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 43859a1f7e IGNITE-17998 Sql. Close the cursor synchronously when the session is closed (#1330) 43859a1f7e is described below commit 43859a1f7ef8ef761ca3da8587a3d5c7b073b841 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Wed Nov 16 13:11:31 2022 +0300 IGNITE-17998 Sql. Close the cursor synchronously when the session is closed (#1330) --- .../ignite/internal/sql/api/ItCommonApiTest.java | 9 +++-- .../internal/sql/api/ItSqlAsynchronousApiTest.java | 5 ++- .../sql/engine/exec/ExecutionServiceImpl.java | 39 ++++++++++++++-------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java index 4ede4c48c4..6b46dc5b76 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.sql.api; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.lang.ErrorGroups.Sql; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; -import org.apache.ignite.sql.CursorClosedException; +import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.Session; @@ -78,16 +79,14 @@ public class ItCommonApiTest extends AbstractBasicIntegrationTest { assertEquals(Sql.SESSION_NOT_FOUND_ERR, ex.code()); // already started query should fail due to session has been expired - ex = assertThrows(CursorClosedException.class, () -> { + assertThrowsWithCause(() -> { while (rs1.hasNext()) { rs1.next(); } - }); + }, ExecutionCancelledException.class); rs1.close(); - assertEquals(Sql.CURSOR_CLOSED_ERR, ex.code()); - // second session could proceed with execution while (rs2.hasNext()) { rs2.next(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index 6b651caef0..418d8941a9 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.client.sql.ClientSql; import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl; import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest; +import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxState; @@ -76,7 +77,6 @@ import org.apache.ignite.table.Table; import org.apache.ignite.tx.IgniteTransactions; import org.apache.ignite.tx.Transaction; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -577,7 +577,6 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest { } } - @Disabled("https://issues.apache.org/jira/browse/IGNITE-17998") @Test public void closeSession() throws ExecutionException, InterruptedException { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); @@ -597,7 +596,7 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest { assertThrowsWithCause( () -> ars0.fetchNextPage().toCompletableFuture().get(), - SqlException.class + ExecutionCancelledException.class ); assertThrowsWithCause( diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index e1cd92462c..81c70e51f4 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -197,7 +197,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve assert old == null; - ctx.cancel().add(() -> queryManager.close(false)); + ctx.cancel().add(() -> queryManager.close(true)); return queryManager.execute(plan); } @@ -622,20 +622,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve return cancelFut.thenApply(Function.identity()); } - CompletableFuture<Void> start = new CompletableFuture<>(); + CompletableFuture<Void> start = closeExecNode(cancel); start - .thenCompose(none -> { - if (!root.completeExceptionally(new ExecutionCancelledException()) && !root.isCompletedExceptionally()) { - if (cancel) { - return root.thenAccept(root -> root.onError(new ExecutionCancelledException())); - } - - return root.thenCompose(AsyncRootNode::closeAsync); - } - - return CompletableFuture.completedFuture(null); - }) .thenCompose(tmp -> { Map<String, List<CompletableFuture<?>>> requestsPerNode = new HashMap<>(); for (Map.Entry<RemoteFragmentKey, CompletableFuture<Void>> entry : remoteFragmentInitCompletion.entrySet()) { @@ -693,6 +682,30 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve return cancelFut.thenApply(Function.identity()); } + + /** + * Synchronously closes the tree's execution iterator. + * + * @param cancel Forces execution to terminate with {@link ExecutionCancelledException}. + * @return Completable future that should run asynchronously. + */ + private CompletableFuture<Void> closeExecNode(boolean cancel) { + CompletableFuture<Void> fut = new CompletableFuture<>(); + + if (!root.completeExceptionally(new ExecutionCancelledException()) && !root.isCompletedExceptionally()) { + AsyncRootNode<RowT, List<Object>> node = root.getNow(null); + + if (!cancel) { + CompletableFuture<Void> closeFut = node.closeAsync(); + + return fut.thenCompose(v -> closeFut); + } + + node.onError(new ExecutionCancelledException()); + } + + return fut; + } } @FunctionalInterface