This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 2e5f05220d IGNITE-16960 Sql. Support script execution using native public Java API (#2884) 2e5f05220d is described below commit 2e5f05220d1db0cb1bae5a40bdeafe14dc801930 Author: Max Zhuravkov <shh...@gmail.com> AuthorDate: Wed Nov 29 12:16:02 2023 +0200 IGNITE-16960 Sql. Support script execution using native public Java API (#2884) --- .../ignite/internal/sql/AbstractSession.java | 18 ++++ .../ignite/internal/sql/api/ItSqlApiBaseTest.java | 46 +++++++++ .../internal/sql/api/ItSqlAsynchronousApiTest.java | 5 + .../sql/api/ItSqlClientAsynchronousApiTest.java | 12 +++ .../sql/api/ItSqlClientSynchronousApiTest.java | 12 +++ .../internal/sql/api/ItSqlSynchronousApiTest.java | 5 + .../ignite/internal/sql/api/SessionImpl.java | 83 ++++++++++++++-- .../ignite/internal/sql/api/SessionImplTest.java | 106 +++++++++++++++++++++ 8 files changed, 280 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java index 24f8681ca0..37b8299b51 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java @@ -150,4 +150,22 @@ public interface AbstractSession extends Session { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } } + + /** + * Executes a multi-statement SQL query. + * + * @param query SQL query template. + * @param arguments Arguments for the template (optional). + * @throws SqlException If failed. + */ + @Override + default void executeScript(String query, @Nullable Object... arguments) { + Objects.requireNonNull(query); + + try { + executeScriptAsync(query, arguments).join(); + } catch (CompletionException e) { + throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); + } + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java index ba287ca8de..b27b596b74 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java @@ -826,6 +826,50 @@ public abstract class ItSqlApiBaseTest extends BaseSqlIntegrationTest { assertEquals(0, txManager().pending(), "Expected no pending transactions"); } + + @Test + public void runScriptThatCompletesSuccessfully() { + IgniteSql sql = igniteSql(); + + try (Session session = sql.createSession()) { + executeScript(session, + "CREATE TABLE test (id INT PRIMARY KEY, step INTEGER); " + + "START TRANSACTION; " + + "INSERT INTO test VALUES(1, 0); " + + "UPDATE test SET step = 1; " + + "SELECT * FROM test; " + + "UPDATE test SET step = 2; " + + "COMMIT; "); + + ResultProcessor result = execute(session, "SELECT step FROM test"); + assertEquals(1, result.result().size()); + assertEquals(2, result.result().get(0).intValue(0)); + } + } + + @Test + public void runScriptThatFails() { + IgniteSql sql = igniteSql(); + + try (Session session = sql.createSession()) { + assertThrowsSqlException( + Sql.RUNTIME_ERR, + "/ by zero", + () -> executeScript(session, + "CREATE TABLE test (id INT PRIMARY KEY, step INTEGER); " + + "INSERT INTO test VALUES(1, 0); " + + "UPDATE test SET step = 1; " + + "SELECT 1/0; " + + "UPDATE test SET step = 2; " + ) + ); + + ResultProcessor result = execute(session, "SELECT step FROM test"); + assertEquals(1, result.result().size()); + assertEquals(1, result.result().get(0).intValue(0)); + } + } + protected ResultSet<SqlRow> executeForRead(Session ses, String query, Object... args) { return executeForRead(ses, null, query, args); } @@ -873,6 +917,8 @@ public abstract class ItSqlApiBaseTest extends BaseSqlIntegrationTest { return execute(null, null, ses, sql, args); } + protected abstract void executeScript(Session ses, String sql, Object... args); + protected abstract void rollback(Transaction outerTx); protected abstract void commit(Transaction outerTx); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index 77d45cc8f6..f0d918a982 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -106,6 +106,11 @@ public class ItSqlAsynchronousApiTest extends ItSqlApiBaseTest { return asyncProcessor; } + @Override + protected void executeScript(Session ses, String sql, Object... args) { + await(ses.executeScriptAsync(sql, args)); + } + @Override protected void rollback(Transaction tx) { await(tx.rollbackAsync()); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java index 9e6a097ff2..73a33a1233 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java @@ -70,4 +70,16 @@ public class ItSqlClientAsynchronousApiTest extends ItSqlAsynchronousApiTest { public void testLockIsNotReleasedAfterTxRollback() { super.testLockIsNotReleasedAfterTxRollback(); } + + @Override + @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060") + public void runScriptThatCompletesSuccessfully() { + super.runScriptThatCompletesSuccessfully(); + } + + @Override + @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060") + public void runScriptThatFails() { + super.runScriptThatFails(); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java index 8ce27d6dd5..04e189a0c8 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java @@ -72,4 +72,16 @@ public class ItSqlClientSynchronousApiTest extends ItSqlSynchronousApiTest { public void testLockIsNotReleasedAfterTxRollback() { super.testLockIsNotReleasedAfterTxRollback(); } + + @Override + @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060") + public void runScriptThatCompletesSuccessfully() { + super.runScriptThatCompletesSuccessfully(); + } + + @Override + @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060") + public void runScriptThatFails() { + super.runScriptThatFails(); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java index acc946bb53..d068cbb6f5 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java @@ -63,6 +63,11 @@ public class ItSqlSynchronousApiTest extends ItSqlApiBaseTest { return syncProcessor; } + @Override + protected void executeScript(Session ses, String sql, Object... args) { + ses.executeScript(sql, args); + } + @Override protected void rollback(Transaction tx) { tx.rollback(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java index f2dbaebada..a9c6bc8ada 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java @@ -145,12 +145,6 @@ public class SessionImpl implements AbstractSession { throw new UnsupportedOperationException("Not implemented yet."); } - /** {@inheritDoc} */ - @Override - public void executeScript(String query, @Nullable Object... arguments) { - throw new UnsupportedOperationException("Not implemented yet."); - } - /** {@inheritDoc} */ @Override public long defaultQueryTimeout(TimeUnit timeUnit) { @@ -401,7 +395,29 @@ public class SessionImpl implements AbstractSession { /** {@inheritDoc} */ @Override public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments) { - throw new UnsupportedOperationException("Not implemented yet."); + touchAndCloseIfExpired(); + + if (!busyLock.enterBusy()) { + return CompletableFuture.failedFuture(sessionIsClosedException()); + } + + CompletableFuture<Void> resFut = new CompletableFuture<>(); + try { + SqlProperties properties = SqlPropertiesHelper.emptyProperties(); + + CompletableFuture<AsyncSqlCursor<List<Object>>> f = qryProc.queryScriptAsync(properties, transactions, null, query, arguments); + + ScriptHandler handler = new ScriptHandler(resFut); + f.whenComplete(handler::processFirstResult); + } finally { + busyLock.leaveBusy(); + } + + return resFut.exceptionally((th) -> { + Throwable cause = ExceptionUtils.unwrapCause(th); + + throw new CompletionException(mapToPublicSqlException(cause)); + }); } /** {@inheritDoc} */ @@ -523,4 +539,57 @@ public class SessionImpl implements AbstractSession { List<AsyncSqlCursor<?>> openedCursors() { return List.copyOf(openedCursors.values()); } + + private class ScriptHandler { + + private final CompletableFuture<Void> resFut; + + private ScriptHandler(CompletableFuture<Void> resFut) { + this.resFut = resFut; + } + + void processFirstResult(AsyncSqlCursor<List<Object>> cursor, Throwable t) { + if (t != null) { + resFut.completeExceptionally(t); + } else { + int cursorId = registerCursor(cursor); + processCursor(cursor, cursorId); + } + } + + void processCursor(AsyncSqlCursor<List<Object>> cursor, int cursorId) { + if (!busyLock.enterBusy()) { + closeCursor(cursor, cursorId); + + resFut.completeExceptionally(sessionIsClosedException()); + return; + } + + try { + if (cursor.hasNextResult()) { + cursor.nextResult().whenComplete((nextCursor, t) -> { + closeCursor(cursor, cursorId); + + if (nextCursor != null) { + int nextCursorId = registerCursor(nextCursor); + processCursor(nextCursor, nextCursorId); + } else { + resFut.completeExceptionally(t); + } + }); + } else { + closeCursor(cursor, cursorId); + + resFut.complete(null); + } + } finally { + busyLock.leaveBusy(); + } + } + + void closeCursor(AsyncSqlCursor<List<Object>> cursor, int cursorId) { + openedCursors.remove(cursorId); + cursor.closeAsync(); + } + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java index 771c5adafa..c6f255fb96 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.api; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_CLOSED_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; @@ -27,6 +28,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; @@ -341,6 +343,110 @@ class SessionImplTest extends BaseIgniteAbstractTest { assertThat(session.openedCursors(), empty()); } + @Test + public void scriptIteratesOverCursors() { + AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class, "cursor1"); + AsyncSqlCursor<List<Object>> cursor2 = mock(AsyncSqlCursor.class, "cursor2"); + + when(cursor1.hasNextResult()).thenReturn(true); + when(cursor1.nextResult()).thenReturn(CompletableFuture.completedFuture(cursor2)); + when(cursor1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + when(cursor2.hasNextResult()).thenReturn(false); + when(cursor2.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), any(Object[].class))) + .thenReturn(CompletableFuture.completedFuture(cursor1)); + + SessionImpl session = newSession(3); + + Void rs = await(session.executeScriptAsync("SELECT 1; SELECT 2")); + + assertNull(rs); + assertThat(session.openedCursors(), empty()); + } + + @Test + public void scriptRethrowsExceptionFromCursor() { + AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class); + + when(cursor1.hasNextResult()).thenReturn(true); + when(cursor1.nextResult()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("Broken"))); + when(cursor1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), any(Object[].class))) + .thenReturn(CompletableFuture.completedFuture(cursor1)); + + SessionImpl session = newSession(3); + + assertThrowsSqlException( + INTERNAL_ERR, + "Broken", + () -> await(session.executeScriptAsync("SELECT 1; SELECT 2")) + ); + + assertThat(session.openedCursors(), empty()); + } + + @Test + public void scriptIgnoresCloseCursorException() { + AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class, "cursor1"); + AsyncSqlCursor<List<Object>> cursor2 = mock(AsyncSqlCursor.class, "cursor2"); + + when(cursor1.hasNextResult()).thenReturn(true); + when(cursor1.nextResult()).thenReturn(CompletableFuture.completedFuture(cursor2)); + when(cursor1.closeAsync()).thenReturn(CompletableFuture.failedFuture(new IllegalStateException("cursor1"))); + + when(cursor2.hasNextResult()).thenReturn(false); + when(cursor2.closeAsync()).thenReturn(CompletableFuture.failedFuture(new IllegalStateException("cursor2"))); + + when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), any(Object[].class))) + .thenReturn(CompletableFuture.completedFuture(cursor1)); + + SessionImpl session = newSession(3); + + Void rs = await(session.executeScriptAsync("SELECT 1; SELECT 2")); + + assertNull(rs); + assertThat(session.openedCursors(), empty()); + } + + @Test + public void scriptTerminatesWhenSessionCloses() { + AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class, "cursor1"); + AsyncSqlCursor<List<Object>> cursor2 = mock(AsyncSqlCursor.class, "cursor2"); + + CompletableFuture<AsyncSqlCursor<List<Object>>> cursor2Fut = new CompletableFuture<>(); + + when(cursor1.hasNextResult()).thenReturn(true); + when(cursor1.nextResult()).thenAnswer(ignored -> cursor2Fut); + when(cursor1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + when(cursor2.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), any(Object[].class))) + .thenReturn(CompletableFuture.completedFuture(cursor1)); + + SessionImpl session = newSession(3); + + Thread thread = new Thread(() -> { + session.close(); + cursor2Fut.complete(cursor2); + }); + + assertThrowsSqlException( + SESSION_CLOSED_ERR, + "Session is closed", + () -> { + CompletableFuture<Void> f = session.executeScriptAsync("SELECT 1; SELECT 2"); + thread.start(); + await(f); + } + ); + + assertThat(session.openedCursors(), empty()); + } + private SessionImpl newSession(long idleTimeout) { SessionBuilder builder = new SessionBuilderImpl( new IgniteSpinBusyLock(),