This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e5f05220d IGNITE-16960 Sql. Support script execution using native 
public Java API (#2884)
2e5f05220d is described below

commit 2e5f05220d1db0cb1bae5a40bdeafe14dc801930
Author: Max Zhuravkov <shh...@gmail.com>
AuthorDate: Wed Nov 29 12:16:02 2023 +0200

    IGNITE-16960 Sql. Support script execution using native public Java API 
(#2884)
---
 .../ignite/internal/sql/AbstractSession.java       |  18 ++++
 .../ignite/internal/sql/api/ItSqlApiBaseTest.java  |  46 +++++++++
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |   5 +
 .../sql/api/ItSqlClientAsynchronousApiTest.java    |  12 +++
 .../sql/api/ItSqlClientSynchronousApiTest.java     |  12 +++
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |   5 +
 .../ignite/internal/sql/api/SessionImpl.java       |  83 ++++++++++++++--
 .../ignite/internal/sql/api/SessionImplTest.java   | 106 +++++++++++++++++++++
 8 files changed, 280 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java
index 24f8681ca0..37b8299b51 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/AbstractSession.java
@@ -150,4 +150,22 @@ public interface AbstractSession extends Session {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
     }
+
+    /**
+     * Executes a multi-statement SQL query.
+     *
+     * @param query SQL query template.
+     * @param arguments Arguments for the template (optional).
+     * @throws SqlException If failed.
+     */
+    @Override
+    default void executeScript(String query, @Nullable Object... arguments) {
+        Objects.requireNonNull(query);
+
+        try {
+            executeScriptAsync(query, arguments).join();
+        } catch (CompletionException e) {
+            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+        }
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index ba287ca8de..b27b596b74 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -826,6 +826,50 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
         assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
     }
 
+
+    @Test
+    public void runScriptThatCompletesSuccessfully() {
+        IgniteSql sql = igniteSql();
+
+        try (Session session = sql.createSession()) {
+            executeScript(session,
+                    "CREATE TABLE test (id INT PRIMARY KEY, step INTEGER); "
+                            + "START TRANSACTION; "
+                            + "INSERT INTO test VALUES(1, 0); "
+                            + "UPDATE test SET step = 1; "
+                            + "SELECT * FROM test; "
+                            + "UPDATE test SET step = 2; "
+                            + "COMMIT; ");
+
+            ResultProcessor result = execute(session, "SELECT step FROM test");
+            assertEquals(1, result.result().size());
+            assertEquals(2, result.result().get(0).intValue(0));
+        }
+    }
+
+    @Test
+    public void runScriptThatFails() {
+        IgniteSql sql = igniteSql();
+
+        try (Session session = sql.createSession()) {
+            assertThrowsSqlException(
+                    Sql.RUNTIME_ERR,
+                    "/ by zero",
+                    () -> executeScript(session,
+                            "CREATE TABLE test (id INT PRIMARY KEY, step 
INTEGER); "
+                                    + "INSERT INTO test VALUES(1, 0); "
+                                    + "UPDATE test SET step = 1; "
+                                    + "SELECT 1/0; "
+                                    + "UPDATE test SET step = 2; "
+                    )
+            );
+
+            ResultProcessor result = execute(session, "SELECT step FROM test");
+            assertEquals(1, result.result().size());
+            assertEquals(1, result.result().get(0).intValue(0));
+        }
+    }
+
     protected ResultSet<SqlRow> executeForRead(Session ses, String query, 
Object... args) {
         return executeForRead(ses, null, query, args);
     }
@@ -873,6 +917,8 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
         return execute(null, null, ses, sql, args);
     }
 
+    protected abstract void executeScript(Session ses, String sql, Object... 
args);
+
     protected abstract void rollback(Transaction outerTx);
 
     protected abstract void commit(Transaction outerTx);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 77d45cc8f6..f0d918a982 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -106,6 +106,11 @@ public class ItSqlAsynchronousApiTest extends 
ItSqlApiBaseTest {
         return asyncProcessor;
     }
 
+    @Override
+    protected void executeScript(Session ses, String sql, Object... args) {
+        await(ses.executeScriptAsync(sql, args));
+    }
+
     @Override
     protected void rollback(Transaction tx) {
         await(tx.rollbackAsync());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index 9e6a097ff2..73a33a1233 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -70,4 +70,16 @@ public class ItSqlClientAsynchronousApiTest extends 
ItSqlAsynchronousApiTest {
     public void testLockIsNotReleasedAfterTxRollback() {
         super.testLockIsNotReleasedAfterTxRollback();
     }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060";)
+    public void runScriptThatCompletesSuccessfully() {
+        super.runScriptThatCompletesSuccessfully();
+    }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060";)
+    public void runScriptThatFails() {
+        super.runScriptThatFails();
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 8ce27d6dd5..04e189a0c8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -72,4 +72,16 @@ public class ItSqlClientSynchronousApiTest extends 
ItSqlSynchronousApiTest {
     public void testLockIsNotReleasedAfterTxRollback() {
         super.testLockIsNotReleasedAfterTxRollback();
     }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060";)
+    public void runScriptThatCompletesSuccessfully() {
+        super.runScriptThatCompletesSuccessfully();
+    }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060";)
+    public void runScriptThatFails() {
+        super.runScriptThatFails();
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index acc946bb53..d068cbb6f5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -63,6 +63,11 @@ public class ItSqlSynchronousApiTest extends 
ItSqlApiBaseTest {
         return syncProcessor;
     }
 
+    @Override
+    protected void executeScript(Session ses, String sql, Object... args) {
+        ses.executeScript(sql, args);
+    }
+
     @Override
     protected void rollback(Transaction tx) {
         tx.rollback();
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index f2dbaebada..a9c6bc8ada 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -145,12 +145,6 @@ public class SessionImpl implements AbstractSession {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void executeScript(String query, @Nullable Object... arguments) {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
     /** {@inheritDoc} */
     @Override
     public long defaultQueryTimeout(TimeUnit timeUnit) {
@@ -401,7 +395,29 @@ public class SessionImpl implements AbstractSession {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> executeScriptAsync(String query, @Nullable 
Object... arguments) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        touchAndCloseIfExpired();
+
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(sessionIsClosedException());
+        }
+
+        CompletableFuture<Void> resFut = new CompletableFuture<>();
+        try {
+            SqlProperties properties = SqlPropertiesHelper.emptyProperties();
+
+            CompletableFuture<AsyncSqlCursor<List<Object>>> f = 
qryProc.queryScriptAsync(properties, transactions, null, query, arguments);
+
+            ScriptHandler handler = new ScriptHandler(resFut);
+            f.whenComplete(handler::processFirstResult);
+        } finally {
+            busyLock.leaveBusy();
+        }
+
+        return resFut.exceptionally((th) -> {
+            Throwable cause = ExceptionUtils.unwrapCause(th);
+
+            throw new CompletionException(mapToPublicSqlException(cause));
+        });
     }
 
     /** {@inheritDoc} */
@@ -523,4 +539,57 @@ public class SessionImpl implements AbstractSession {
     List<AsyncSqlCursor<?>> openedCursors() {
         return List.copyOf(openedCursors.values());
     }
+
+    private class ScriptHandler {
+
+        private final CompletableFuture<Void> resFut;
+
+        private ScriptHandler(CompletableFuture<Void> resFut) {
+            this.resFut = resFut;
+        }
+
+        void processFirstResult(AsyncSqlCursor<List<Object>> cursor, Throwable 
t) {
+            if (t != null) {
+                resFut.completeExceptionally(t);
+            } else {
+                int cursorId = registerCursor(cursor);
+                processCursor(cursor, cursorId);
+            }
+        }
+
+        void processCursor(AsyncSqlCursor<List<Object>> cursor, int cursorId) {
+            if (!busyLock.enterBusy()) {
+                closeCursor(cursor, cursorId);
+
+                resFut.completeExceptionally(sessionIsClosedException());
+                return;
+            }
+
+            try {
+                if (cursor.hasNextResult()) {
+                    cursor.nextResult().whenComplete((nextCursor, t) -> {
+                        closeCursor(cursor, cursorId);
+
+                        if (nextCursor != null) {
+                            int nextCursorId = registerCursor(nextCursor);
+                            processCursor(nextCursor, nextCursorId);
+                        } else {
+                            resFut.completeExceptionally(t);
+                        }
+                    });
+                } else {
+                    closeCursor(cursor, cursorId);
+
+                    resFut.complete(null);
+                }
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        void closeCursor(AsyncSqlCursor<List<Object>> cursor, int cursorId) {
+            openedCursors.remove(cursorId);
+            cursor.closeAsync();
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java
index 771c5adafa..c6f255fb96 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.api;
 
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_CLOSED_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -27,6 +28,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
@@ -341,6 +343,110 @@ class SessionImplTest extends BaseIgniteAbstractTest {
         assertThat(session.openedCursors(), empty());
     }
 
+    @Test
+    public void scriptIteratesOverCursors() {
+        AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class, 
"cursor1");
+        AsyncSqlCursor<List<Object>> cursor2 = mock(AsyncSqlCursor.class, 
"cursor2");
+
+        when(cursor1.hasNextResult()).thenReturn(true);
+        
when(cursor1.nextResult()).thenReturn(CompletableFuture.completedFuture(cursor2));
+        
when(cursor1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        when(cursor2.hasNextResult()).thenReturn(false);
+        
when(cursor2.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), 
any(Object[].class)))
+                .thenReturn(CompletableFuture.completedFuture(cursor1));
+
+        SessionImpl session = newSession(3);
+
+        Void rs = await(session.executeScriptAsync("SELECT 1; SELECT 2"));
+
+        assertNull(rs);
+        assertThat(session.openedCursors(), empty());
+    }
+
+    @Test
+    public void scriptRethrowsExceptionFromCursor() {
+        AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class);
+
+        when(cursor1.hasNextResult()).thenReturn(true);
+        
when(cursor1.nextResult()).thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("Broken")));
+        
when(cursor1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), 
any(Object[].class)))
+                .thenReturn(CompletableFuture.completedFuture(cursor1));
+
+        SessionImpl session = newSession(3);
+
+        assertThrowsSqlException(
+                INTERNAL_ERR,
+                "Broken",
+                () -> await(session.executeScriptAsync("SELECT 1; SELECT 2"))
+        );
+
+        assertThat(session.openedCursors(), empty());
+    }
+
+    @Test
+    public void scriptIgnoresCloseCursorException() {
+        AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class, 
"cursor1");
+        AsyncSqlCursor<List<Object>> cursor2 = mock(AsyncSqlCursor.class, 
"cursor2");
+
+        when(cursor1.hasNextResult()).thenReturn(true);
+        
when(cursor1.nextResult()).thenReturn(CompletableFuture.completedFuture(cursor2));
+        
when(cursor1.closeAsync()).thenReturn(CompletableFuture.failedFuture(new 
IllegalStateException("cursor1")));
+
+        when(cursor2.hasNextResult()).thenReturn(false);
+        
when(cursor2.closeAsync()).thenReturn(CompletableFuture.failedFuture(new 
IllegalStateException("cursor2")));
+
+        when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), 
any(Object[].class)))
+                .thenReturn(CompletableFuture.completedFuture(cursor1));
+
+        SessionImpl session = newSession(3);
+
+        Void rs = await(session.executeScriptAsync("SELECT 1; SELECT 2"));
+
+        assertNull(rs);
+        assertThat(session.openedCursors(), empty());
+    }
+
+    @Test
+    public void scriptTerminatesWhenSessionCloses() {
+        AsyncSqlCursor<List<Object>> cursor1 = mock(AsyncSqlCursor.class, 
"cursor1");
+        AsyncSqlCursor<List<Object>> cursor2 = mock(AsyncSqlCursor.class, 
"cursor2");
+
+        CompletableFuture<AsyncSqlCursor<List<Object>>> cursor2Fut = new 
CompletableFuture<>();
+
+        when(cursor1.hasNextResult()).thenReturn(true);
+        when(cursor1.nextResult()).thenAnswer(ignored -> cursor2Fut);
+        
when(cursor1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        
when(cursor2.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        when(queryProcessor.queryScriptAsync(any(), any(), any(), any(), 
any(Object[].class)))
+                .thenReturn(CompletableFuture.completedFuture(cursor1));
+
+        SessionImpl session = newSession(3);
+
+        Thread thread = new Thread(() -> {
+            session.close();
+            cursor2Fut.complete(cursor2);
+        });
+
+        assertThrowsSqlException(
+                SESSION_CLOSED_ERR,
+                "Session is closed",
+                () -> {
+                    CompletableFuture<Void> f = 
session.executeScriptAsync("SELECT 1; SELECT 2");
+                    thread.start();
+                    await(f);
+                }
+        );
+
+        assertThat(session.openedCursors(), empty());
+    }
+
     private SessionImpl newSession(long idleTimeout) {
         SessionBuilder builder = new SessionBuilderImpl(
                 new IgniteSpinBusyLock(),

Reply via email to