This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3efece8245 IGNITE-19919 Sql. ResultSet#close should close implicit 
transaction if any (#2423)
3efece8245 is described below

commit 3efece824520e4345cdd89014ebe6b867c18442d
Author: Max Zhuravkov <shh...@gmail.com>
AuthorDate: Fri Aug 11 16:43:27 2023 +0300

    IGNITE-19919 Sql. ResultSet#close should close implicit transaction if any 
(#2423)
---
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  41 ++++++++
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  53 ++++++++--
 .../internal/sql/engine/AsyncSqlCursorImpl.java    |   4 +
 .../sql/engine/AsyncSqlCursorImplTest.java         | 117 +++++++++++++++++++++
 .../sql/engine/framework/NoOpTransaction.java      |  24 ++++-
 5 files changed, 227 insertions(+), 12 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 694a28e8af..24b5b297db 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIn
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.CURSOR_CLOSED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
@@ -766,6 +767,46 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         IntStream.range(0, ex.updateCounters().length).forEach(i -> 
assertEquals(1, ex.updateCounters()[i]));
     }
 
+    @Test
+    public void resultSetCloseShouldFinishImplicitTransaction() throws 
InterruptedException {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = igniteSql();
+
+        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+        CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, 
"SELECT * FROM TEST");
+
+        AsyncResultSet<SqlRow> ars = f.join();
+        // There should be a pending transaction since not all data was read.
+        boolean txStarted = waitForCondition(() -> txManager().pending() == 1, 
5000);
+        assertTrue(txStarted, "No pending transactions");
+
+        ars.closeAsync().join();
+        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+    }
+
+    @Test
+    public void resultSetFullReadShouldFinishImplicitTransaction() {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = igniteSql();
+
+        // Fetch all data in one read.
+        Session ses = sql.sessionBuilder().defaultPageSize(100).build();
+        CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, 
"SELECT * FROM TEST");
+
+        AsyncResultSet<SqlRow> ars = f.join();
+        assertFalse(ars.hasMorePages());
+
+        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+    }
+
     private static void checkDdl(boolean expectedApplied, Session ses, String 
sql, Transaction tx) {
         CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
                 tx,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 0a8f3f4d81..dc1bf88eb7 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.api;
 
 import static 
org.apache.ignite.internal.sql.api.ItSqlAsynchronousApiTest.assertThrowsPublicException;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.lang.ErrorGroups.Sql.CURSOR_CLOSED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_NO_RESULT_SET_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR;
@@ -49,6 +50,7 @@ import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.CursorClosedException;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.NoRowSetExpectedException;
 import org.apache.ignite.sql.ResultSet;
@@ -305,15 +307,14 @@ public class ItSqlSynchronousApiTest extends 
ClusterPerClassIntegrationTest {
             assertThrowsPublicException(rs::next, 
NoRowSetExpectedException.class, QUERY_NO_RESULT_SET_ERR, "Query has no result 
set");
         }
 
-        // TODO unmute after https://issues.apache.org/jira/browse/IGNITE-19919
         // Cursor closed error.
-        // {
-        //     ResultSet rs = ses.execute(null, "SELECT * FROM TEST");
-        //     Thread.sleep(300); // ResultSetImpl fetches next page in 
background, wait to it to complete to avoid flakiness.
-        //     rs.close();
-        //     assertThrowsPublicException(() -> 
rs.forEachRemaining(Object::hashCode),
-        //             CursorClosedException.class, CURSOR_CLOSED_ERR, null);
-        // }
+        {
+            ResultSet rs = ses.execute(null, "SELECT * FROM TEST");
+            Thread.sleep(300); // ResultSetImpl fetches next page in 
background, wait to it to complete to avoid flakiness.
+            rs.close();
+            assertThrowsPublicException(() -> 
rs.forEachRemaining(Object::hashCode),
+                    CursorClosedException.class, CURSOR_CLOSED_ERR, null);
+        }
     }
 
     /**
@@ -416,6 +417,42 @@ public class ItSqlSynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         IntStream.range(0, batchEx.updateCounters().length).forEach(i -> 
assertEquals(1, batchEx.updateCounters()[i]));
     }
 
+    @Test
+    public void resultSetCloseShouldFinishImplicitTransaction() {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = igniteSql();
+        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+        ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST");
+        assertEquals(1, txManager().pending());
+        rs.close();
+        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+    }
+
+    @Test
+    public void resultSetFullReadShouldFinishImplicitTransaction() {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = igniteSql();
+
+        // Fetch all data in one read.
+        Session ses = sql.sessionBuilder().defaultPageSize(100).build();
+        ResultSet<SqlRow> rs = ses.execute(null, "SELECT * FROM TEST");
+
+        while (rs.hasNext()) {
+            rs.next();
+        }
+
+        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+    }
+
     private static void checkDdl(boolean expectedApplied, Session ses, String 
sql) {
         ResultSet res = ses.execute(
                 null,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 2be580ace8..99251fa8cd 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -91,6 +91,10 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> closeAsync() {
+        // Commit implicit transaction, if any.
+        if (implicitTx != null) {
+            implicitTx.commit();
+        }
         return dataCursor.closeAsync();
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
new file mode 100644
index 0000000000..dec2b80a98
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.sql.engine.exec.AsyncWrapper;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@link AsyncSqlCursorImpl}.
+ */
+public class AsyncSqlCursorImplTest {
+
+    private static final ResultSetMetadata RESULT_SET_METADATA = new 
ResultSetMetadataImpl(Collections.emptyList());
+
+    /** Cursor should trigger commit of implicit transaction (if any) only if 
data is fully read. */
+    @ParameterizedTest
+    @MethodSource("transactions")
+    public void testTriggerCommitAfterDataIsFullyRead(NoOpTransaction 
implicitTx) {
+        List<Integer> list = List.of(1, 2, 3);
+
+        AsyncSqlCursorImpl<Integer> cursor = new 
AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+                new 
AsyncWrapper<>(CompletableFuture.completedFuture(list.iterator()), 
Runnable::run));
+
+        int requestRows = 2;
+        BatchedResult<Integer> in1 = 
cursor.requestNextAsync(requestRows).join();
+        assertEquals(in1.items(), list.subList(0, requestRows));
+
+        if (implicitTx != null) {
+            CompletableFuture<Void> f = implicitTx.commitFuture();
+            assertFalse(f.isDone(), "Implicit transaction should have not been 
committed because there is more data.");
+        }
+
+        BatchedResult<Integer> in2 = 
cursor.requestNextAsync(requestRows).join();
+        assertEquals(in2.items(), list.subList(requestRows, list.size()));
+
+        if (implicitTx != null) {
+            CompletableFuture<Void> f = implicitTx.commitFuture();
+            assertTrue(f.isDone(), "Implicit transaction should been committed 
because there is no more data");
+        }
+    }
+
+    /** Exception on read should trigger rollback of implicit transaction, if 
any. */
+    @ParameterizedTest
+    @MethodSource("transactions")
+    public void testExceptionRollbacksImplicitTx(NoOpTransaction implicitTx) {
+        IgniteException err = new IgniteException(Common.INTERNAL_ERR);
+
+        AsyncSqlCursorImpl<Integer> cursor = new 
AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+                new AsyncWrapper<>(CompletableFuture.failedFuture(err), 
Runnable::run));
+
+        CompletionException t = assertThrows(CompletionException.class, () -> 
cursor.requestNextAsync(1).join());
+
+        if (implicitTx != null) {
+            CompletableFuture<Void> f = implicitTx.rollbackFuture();
+            assertTrue(f.isDone(), "Implicit transaction should have been 
rolled back: " + f);
+        }
+
+        IgniteException igniteErr = assertInstanceOf(IgniteException.class, 
t.getCause());
+        assertEquals(err.codeAsString(), igniteErr.codeAsString());
+    }
+
+    /** Cursor close should trigger commit of implicit transaction, if any. */
+    @ParameterizedTest
+    @MethodSource("transactions")
+    public void testCloseCommitsImplicitTx(NoOpTransaction implicitTx) {
+        AsyncCursor<Integer> data = new AsyncWrapper<>(List.of(1, 2, 3, 
4).iterator());
+        AsyncSqlCursorImpl<Integer> cursor = new 
AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, data);
+        cursor.closeAsync().join();
+
+        if (implicitTx != null) {
+            CompletableFuture<Void> f = implicitTx.commitFuture();
+            assertTrue(f.isDone(), "Implicit transaction should have been 
committed: " + f);
+        }
+    }
+
+    private static Stream<Arguments> transactions() {
+        return Stream.of(
+                Arguments.of(Named.named("implicit-tx", 
NoOpTransaction.readOnly("TX"))),
+                Arguments.of(Named.named("no implicit-tx", null))
+        );
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 2ab456accf..e9e4d59eb3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -46,6 +46,10 @@ public final class NoOpTransaction implements 
InternalTransaction {
 
     private final boolean readOnly;
 
+    private final CompletableFuture<Void> commitFut = new 
CompletableFuture<>();
+
+    private final CompletableFuture<Void> rollbackFut = new 
CompletableFuture<>();
+
     /** Creates a read-write transaction. */
     public static NoOpTransaction readWrite(String name) {
         return new NoOpTransaction(name, false);
@@ -84,22 +88,24 @@ public final class NoOpTransaction implements 
InternalTransaction {
 
     @Override
     public void commit() throws TransactionException {
-
+        commitAsync().join();
     }
 
     @Override
     public CompletableFuture<Void> commitAsync() {
-        return CompletableFuture.completedFuture(null);
+        commitFut.complete(null);
+        return commitFut;
     }
 
     @Override
     public void rollback() throws TransactionException {
-
+        rollbackAsync().join();
     }
 
     @Override
     public CompletableFuture<Void> rollbackAsync() {
-        return CompletableFuture.completedFuture(null);
+        rollbackFut.complete(null);
+        return rollbackFut;
     }
 
     @Override
@@ -155,4 +161,14 @@ public final class NoOpTransaction implements 
InternalTransaction {
     public void enlistResultFuture(CompletableFuture<?> resultFuture) {
         resultFuture.complete(null);
     }
+
+    /** Returns a {@link CompletableFuture} that completes when this 
transaction commits. */
+    public CompletableFuture<Void> commitFuture() {
+        return commitFut;
+    }
+
+    /** Returns a {@link CompletableFuture} that completes when this 
transaction rollbacks. */
+    public CompletableFuture<Void> rollbackFuture() {
+        return rollbackFut;
+    }
 }

Reply via email to