This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 3efece8245 IGNITE-19919 Sql. ResultSet#close should close implicit transaction if any (#2423) 3efece8245 is described below commit 3efece824520e4345cdd89014ebe6b867c18442d Author: Max Zhuravkov <shh...@gmail.com> AuthorDate: Fri Aug 11 16:43:27 2023 +0300 IGNITE-19919 Sql. ResultSet#close should close implicit transaction if any (#2423) --- .../internal/sql/api/ItSqlAsynchronousApiTest.java | 41 ++++++++ .../internal/sql/api/ItSqlSynchronousApiTest.java | 53 ++++++++-- .../internal/sql/engine/AsyncSqlCursorImpl.java | 4 + .../sql/engine/AsyncSqlCursorImplTest.java | 117 +++++++++++++++++++++ .../sql/engine/framework/NoOpTransaction.java | 24 ++++- 5 files changed, 227 insertions(+), 12 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index 694a28e8af..24b5b297db 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIn import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.CURSOR_CLOSED_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR; @@ -766,6 +767,46 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { IntStream.range(0, ex.updateCounters().length).forEach(i -> assertEquals(1, ex.updateCounters()[i])); } + @Test + public void resultSetCloseShouldFinishImplicitTransaction() throws InterruptedException { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + for (int i = 0; i < ROW_COUNT; ++i) { + sql("INSERT INTO TEST VALUES (?, ?)", i, i); + } + + IgniteSql sql = igniteSql(); + + Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); + + AsyncResultSet<SqlRow> ars = f.join(); + // There should be a pending transaction since not all data was read. + boolean txStarted = waitForCondition(() -> txManager().pending() == 1, 5000); + assertTrue(txStarted, "No pending transactions"); + + ars.closeAsync().join(); + assertEquals(0, txManager().pending(), "Expected no pending transactions"); + } + + @Test + public void resultSetFullReadShouldFinishImplicitTransaction() { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + for (int i = 0; i < ROW_COUNT; ++i) { + sql("INSERT INTO TEST VALUES (?, ?)", i, i); + } + + IgniteSql sql = igniteSql(); + + // Fetch all data in one read. + Session ses = sql.sessionBuilder().defaultPageSize(100).build(); + CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); + + AsyncResultSet<SqlRow> ars = f.join(); + assertFalse(ars.hasMorePages()); + + assertEquals(0, txManager().pending(), "Expected no pending transactions"); + } + private static void checkDdl(boolean expectedApplied, Session ses, String sql, Transaction tx) { CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync( tx, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java index 0a8f3f4d81..dc1bf88eb7 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.api; import static org.apache.ignite.internal.sql.api.ItSqlAsynchronousApiTest.assertThrowsPublicException; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; +import static org.apache.ignite.lang.ErrorGroups.Sql.CURSOR_CLOSED_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_NO_RESULT_SET_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR; @@ -49,6 +50,7 @@ import org.apache.ignite.lang.IndexNotFoundException; import org.apache.ignite.lang.TableAlreadyExistsException; import org.apache.ignite.lang.TableNotFoundException; import org.apache.ignite.sql.BatchedArguments; +import org.apache.ignite.sql.CursorClosedException; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.NoRowSetExpectedException; import org.apache.ignite.sql.ResultSet; @@ -305,15 +307,14 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest { assertThrowsPublicException(rs::next, NoRowSetExpectedException.class, QUERY_NO_RESULT_SET_ERR, "Query has no result set"); } - // TODO unmute after https://issues.apache.org/jira/browse/IGNITE-19919 // Cursor closed error. - // { - // ResultSet rs = ses.execute(null, "SELECT * FROM TEST"); - // Thread.sleep(300); // ResultSetImpl fetches next page in background, wait to it to complete to avoid flakiness. - // rs.close(); - // assertThrowsPublicException(() -> rs.forEachRemaining(Object::hashCode), - // CursorClosedException.class, CURSOR_CLOSED_ERR, null); - // } + { + ResultSet rs = ses.execute(null, "SELECT * FROM TEST"); + Thread.sleep(300); // ResultSetImpl fetches next page in background, wait to it to complete to avoid flakiness. + rs.close(); + assertThrowsPublicException(() -> rs.forEachRemaining(Object::hashCode), + CursorClosedException.class, CURSOR_CLOSED_ERR, null); + } } /** @@ -416,6 +417,42 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest { IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i])); } + @Test + public void resultSetCloseShouldFinishImplicitTransaction() { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + for (int i = 0; i < ROW_COUNT; ++i) { + sql("INSERT INTO TEST VALUES (?, ?)", i, i); + } + + IgniteSql sql = igniteSql(); + Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + + ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST"); + assertEquals(1, txManager().pending()); + rs.close(); + assertEquals(0, txManager().pending(), "Expected no pending transactions"); + } + + @Test + public void resultSetFullReadShouldFinishImplicitTransaction() { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + for (int i = 0; i < ROW_COUNT; ++i) { + sql("INSERT INTO TEST VALUES (?, ?)", i, i); + } + + IgniteSql sql = igniteSql(); + + // Fetch all data in one read. + Session ses = sql.sessionBuilder().defaultPageSize(100).build(); + ResultSet<SqlRow> rs = ses.execute(null, "SELECT * FROM TEST"); + + while (rs.hasNext()) { + rs.next(); + } + + assertEquals(0, txManager().pending(), "Expected no pending transactions"); + } + private static void checkDdl(boolean expectedApplied, Session ses, String sql) { ResultSet res = ses.execute( null, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java index 2be580ace8..99251fa8cd 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java @@ -91,6 +91,10 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { /** {@inheritDoc} */ @Override public CompletableFuture<Void> closeAsync() { + // Commit implicit transaction, if any. + if (implicitTx != null) { + implicitTx.commit(); + } return dataCursor.closeAsync(); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java new file mode 100644 index 0000000000..dec2b80a98 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Stream; +import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl; +import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult; +import org.apache.ignite.internal.sql.engine.exec.AsyncWrapper; +import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction; +import org.apache.ignite.lang.ErrorGroups.Common; +import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.sql.ResultSetMetadata; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for {@link AsyncSqlCursorImpl}. + */ +public class AsyncSqlCursorImplTest { + + private static final ResultSetMetadata RESULT_SET_METADATA = new ResultSetMetadataImpl(Collections.emptyList()); + + /** Cursor should trigger commit of implicit transaction (if any) only if data is fully read. */ + @ParameterizedTest + @MethodSource("transactions") + public void testTriggerCommitAfterDataIsFullyRead(NoOpTransaction implicitTx) { + List<Integer> list = List.of(1, 2, 3); + + AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, + new AsyncWrapper<>(CompletableFuture.completedFuture(list.iterator()), Runnable::run)); + + int requestRows = 2; + BatchedResult<Integer> in1 = cursor.requestNextAsync(requestRows).join(); + assertEquals(in1.items(), list.subList(0, requestRows)); + + if (implicitTx != null) { + CompletableFuture<Void> f = implicitTx.commitFuture(); + assertFalse(f.isDone(), "Implicit transaction should have not been committed because there is more data."); + } + + BatchedResult<Integer> in2 = cursor.requestNextAsync(requestRows).join(); + assertEquals(in2.items(), list.subList(requestRows, list.size())); + + if (implicitTx != null) { + CompletableFuture<Void> f = implicitTx.commitFuture(); + assertTrue(f.isDone(), "Implicit transaction should been committed because there is no more data"); + } + } + + /** Exception on read should trigger rollback of implicit transaction, if any. */ + @ParameterizedTest + @MethodSource("transactions") + public void testExceptionRollbacksImplicitTx(NoOpTransaction implicitTx) { + IgniteException err = new IgniteException(Common.INTERNAL_ERR); + + AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, + new AsyncWrapper<>(CompletableFuture.failedFuture(err), Runnable::run)); + + CompletionException t = assertThrows(CompletionException.class, () -> cursor.requestNextAsync(1).join()); + + if (implicitTx != null) { + CompletableFuture<Void> f = implicitTx.rollbackFuture(); + assertTrue(f.isDone(), "Implicit transaction should have been rolled back: " + f); + } + + IgniteException igniteErr = assertInstanceOf(IgniteException.class, t.getCause()); + assertEquals(err.codeAsString(), igniteErr.codeAsString()); + } + + /** Cursor close should trigger commit of implicit transaction, if any. */ + @ParameterizedTest + @MethodSource("transactions") + public void testCloseCommitsImplicitTx(NoOpTransaction implicitTx) { + AsyncCursor<Integer> data = new AsyncWrapper<>(List.of(1, 2, 3, 4).iterator()); + AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, data); + cursor.closeAsync().join(); + + if (implicitTx != null) { + CompletableFuture<Void> f = implicitTx.commitFuture(); + assertTrue(f.isDone(), "Implicit transaction should have been committed: " + f); + } + } + + private static Stream<Arguments> transactions() { + return Stream.of( + Arguments.of(Named.named("implicit-tx", NoOpTransaction.readOnly("TX"))), + Arguments.of(Named.named("no implicit-tx", null)) + ); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 2ab456accf..e9e4d59eb3 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -46,6 +46,10 @@ public final class NoOpTransaction implements InternalTransaction { private final boolean readOnly; + private final CompletableFuture<Void> commitFut = new CompletableFuture<>(); + + private final CompletableFuture<Void> rollbackFut = new CompletableFuture<>(); + /** Creates a read-write transaction. */ public static NoOpTransaction readWrite(String name) { return new NoOpTransaction(name, false); @@ -84,22 +88,24 @@ public final class NoOpTransaction implements InternalTransaction { @Override public void commit() throws TransactionException { - + commitAsync().join(); } @Override public CompletableFuture<Void> commitAsync() { - return CompletableFuture.completedFuture(null); + commitFut.complete(null); + return commitFut; } @Override public void rollback() throws TransactionException { - + rollbackAsync().join(); } @Override public CompletableFuture<Void> rollbackAsync() { - return CompletableFuture.completedFuture(null); + rollbackFut.complete(null); + return rollbackFut; } @Override @@ -155,4 +161,14 @@ public final class NoOpTransaction implements InternalTransaction { public void enlistResultFuture(CompletableFuture<?> resultFuture) { resultFuture.complete(null); } + + /** Returns a {@link CompletableFuture} that completes when this transaction commits. */ + public CompletableFuture<Void> commitFuture() { + return commitFut; + } + + /** Returns a {@link CompletableFuture} that completes when this transaction rollbacks. */ + public CompletableFuture<Void> rollbackFuture() { + return rollbackFut; + } }