This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 39bc0f37d5 IGNITE-18984 Sql. Migrate JDBC batched methods on new internal API (#1866) 39bc0f37d5 is described below commit 39bc0f37d597ec2d58db27e3eeb095097625d70f Author: ygerzhedovich <41903880+ygerzhedov...@users.noreply.github.com> AuthorDate: Fri Mar 31 18:44:20 2023 +0300 IGNITE-18984 Sql. Migrate JDBC batched methods on new internal API (#1866) --- .../internal/jdbc/proto/JdbcQueryEventHandler.java | 8 +- .../client/handler/JdbcQueryEventHandlerImpl.java | 40 ++++--- .../jdbc/ClientJdbcExecuteBatchRequest.java | 4 +- .../jdbc/ClientJdbcPreparedStmntBatchRequest.java | 4 +- .../client/fakes/FakeIgniteQueryProcessor.java | 11 -- .../apache/ignite/jdbc/ItJdbcBatchSelfTest.java | 15 ++- .../internal/jdbc/JdbcClientQueryEventHandler.java | 17 ++- .../internal/jdbc/JdbcPreparedStatement.java | 2 +- .../apache/ignite/internal/jdbc/JdbcStatement.java | 2 +- .../internal/ClusterPerTestIntegrationTest.java | 11 +- .../ignite/internal/sql/engine/QueryProcessor.java | 25 ----- .../internal/sql/engine/SqlQueryProcessor.java | 124 +-------------------- .../internal/sql/engine/StopCalciteModuleTest.java | 20 ++-- .../sql/engine/exec/MockedStructuresTest.java | 89 +++++++++------ 14 files changed, 145 insertions(+), 227 deletions(-) diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java index 737c0a4983..a71c49a203 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java @@ -47,6 +47,7 @@ public interface JdbcQueryEventHandler { /** * {@link JdbcQueryExecuteRequest} command handler. * + * @param connectionId Identifier of the connection. * @param req Execute query request. * @return Result future. */ @@ -55,19 +56,20 @@ public interface JdbcQueryEventHandler { /** * {@link JdbcBatchExecuteRequest} command handler. * + * @param connectionId Identifier of the connection. * @param req Batch query request. * @return Result future. */ - CompletableFuture<JdbcBatchExecuteResult> batchAsync(JdbcBatchExecuteRequest req); + CompletableFuture<JdbcBatchExecuteResult> batchAsync(long connectionId, JdbcBatchExecuteRequest req); /** * {@link JdbcBatchPreparedStmntRequest} command handler. * + * @param connectionId The identifier of the connection. * @param req Batch query request. * @return Result future. */ - CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync( - JdbcBatchPreparedStmntRequest req); + CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(long connectionId, JdbcBatchPreparedStmntRequest req); /** * {@link JdbcMetaTablesRequest} command handler. diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java index 82b04200d4..9836ffc154 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.Pair; +import org.apache.ignite.lang.ErrorGroups.Client; import org.apache.ignite.lang.ErrorGroups.Sql; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteInternalCheckedException; @@ -177,14 +178,14 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { /** {@inheritDoc} */ @Override - public CompletableFuture<JdbcBatchExecuteResult> batchAsync(JdbcBatchExecuteRequest req) { + public CompletableFuture<JdbcBatchExecuteResult> batchAsync(long connectionId, JdbcBatchExecuteRequest req) { List<String> queries = req.queries(); var counters = new IntArrayList(req.queries().size()); var tail = CompletableFuture.completedFuture(counters); for (String query : queries) { - tail = tail.thenCompose(list -> executeAndCollectUpdateCount(req.schemaName(), query, OBJECT_EMPTY_ARRAY) + tail = tail.thenCompose(list -> executeAndCollectUpdateCount(connectionId, query, OBJECT_EMPTY_ARRAY) .thenApply(cnt -> { list.add(cnt > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO : cnt.intValue()); @@ -203,15 +204,14 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { /** {@inheritDoc} */ @Override - public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync( - JdbcBatchPreparedStmntRequest req) { + public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(long connectionId, JdbcBatchPreparedStmntRequest req) { var argList = req.getArgs(); var counters = new IntArrayList(req.getArgs().size()); var tail = CompletableFuture.completedFuture(counters); for (Object[] args : argList) { - tail = tail.thenCompose(list -> executeAndCollectUpdateCount(req.schemaName(), req.getQuery(), args) + tail = tail.thenCompose(list -> executeAndCollectUpdateCount(connectionId, req.getQuery(), args) .thenApply(cnt -> { list.add(cnt > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO : cnt.intValue()); @@ -228,16 +228,25 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { }); } - private CompletableFuture<Long> executeAndCollectUpdateCount(String schema, String sql, Object[] arg) { + private CompletableFuture<Long> executeAndCollectUpdateCount(long connectionId, String sql, Object[] arg) { var context = createQueryContext(JdbcStatementType.UPDATE_STATEMENT_TYPE); - var cursors = processor.queryAsync(context, schema, sql, arg); - - if (cursors.size() != 1) { - return CompletableFuture.failedFuture(new IgniteInternalException("Multi statement queries are not supported in batching")); + JdbcConnectionContext connectionContext; + try { + connectionContext = resources.get(connectionId).get(JdbcConnectionContext.class); + } catch (IgniteInternalCheckedException exception) { + return CompletableFuture.failedFuture(new IgniteInternalException(Client.CONNECTION_ERR)); } - return cursors.get(0).thenCompose(cursor -> cursor.requestNextAsync(1).thenApply(batch -> (Long) batch.items().get(0).get(0))); + CompletableFuture<AsyncSqlCursor<List<Object>>> result = connectionContext.doInSession(sessionId -> processor.querySingleAsync( + sessionId, + context, + sql, + arg == null ? OBJECT_EMPTY_ARRAY : arg + )); + + return result.thenCompose(cursor -> cursor.requestNextAsync(1)) + .thenApply(batch -> (Long) batch.items().get(0).get(0)); } private JdbcBatchExecuteResult handleBatchException(Throwable e, String query, int[] counters) { @@ -290,7 +299,8 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { try (PrintWriter pw = new PrintWriter(sw)) { // We need to remap QueryValidationException into a jdbc error. - if (cause instanceof IgniteException && cause.getCause() instanceof QueryValidationException) { + if (cause instanceof QueryValidationException + || (cause instanceof IgniteException && cause.getCause() instanceof QueryValidationException)) { pw.print("Given statement type does not match that declared by JDBC driver."); } else { pw.print(cause.getMessage()); @@ -394,7 +404,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { potentiallyNotCreatedSessionId = recreateSession(null); } - final SessionId finalSessionId = potentiallyNotCreatedSessionId; + SessionId finalSessionId = potentiallyNotCreatedSessionId; return action.perform(finalSessionId) .handle((BiFunction<T, Throwable, Pair<T, Throwable>>) Pair::new) @@ -457,13 +467,13 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { * when the session is no longer needed. */ @FunctionalInterface - private static interface SessionCleaner { + private interface SessionCleaner { void clean(SessionId sessionId); } /** Interface describing an action that should be performed within the session. */ @FunctionalInterface - static interface SessionAwareAction<T> { + interface SessionAwareAction<T> { CompletableFuture<T> perform(SessionId sessionId); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java index e6d183ca00..311a04a872 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java @@ -42,8 +42,10 @@ public class ClientJdbcExecuteBatchRequest { ) { var req = new JdbcBatchExecuteRequest(); + long connectionId = in.unpackLong(); + req.readBinary(in); - return handler.batchAsync(req).thenAccept(res -> res.writeBinary(out)); + return handler.batchAsync(connectionId, req).thenAccept(res -> res.writeBinary(out)); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java index d0017c6586..ba29ea7a0e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java @@ -42,8 +42,10 @@ public class ClientJdbcPreparedStmntBatchRequest { ) { var req = new JdbcBatchPreparedStmntRequest(); + long connectionId = in.unpackLong(); + req.readBinary(in); - return handler.batchPrepStatementAsync(req).thenAccept(res -> res.writeBinary(out)); + return handler.batchPrepStatementAsync(connectionId, req).thenAccept(res -> res.writeBinary(out)); } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java index dc0ac6e481..08106c9cc6 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java @@ -47,17 +47,6 @@ public class FakeIgniteQueryProcessor implements QueryProcessor { return Collections.emptyList(); } - @Override - public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(String schemaName, String qry, Object... params) { - return List.of(CompletableFuture.completedFuture(new FakeCursor())); - } - - @Override - public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(QueryContext context, String schemaName, - String qry, Object... params) { - return List.of(CompletableFuture.completedFuture(new FakeCursor())); - } - @Override public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync( SessionId sessionid, QueryContext context, String qry, diff --git a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java index cc581ac178..5634af9127 100644 --- a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java +++ b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java @@ -123,10 +123,21 @@ public class ItJdbcBatchSelfTest extends AbstractJdbcSelfTest { } } + @Test + public void testMultipleStatementForBatchIsNotAllowed() throws SQLException { + String insertStmt = "insert into Person (id, firstName, lastName, age) values"; + String ins1 = insertStmt + valuesRow(1); + String ins2 = insertStmt + valuesRow(2); + + stmt.addBatch(ins1 + ";" + ins2); + + assertThrows(BatchUpdateException.class, () -> stmt.executeBatch(), "Multiple statements are not allowed."); + } + @Test public void testBatchOnClosedStatement() throws SQLException { - final Statement stmt2 = conn.createStatement(); - final PreparedStatement pstmt2 = conn.prepareStatement(""); + Statement stmt2 = conn.createStatement(); + PreparedStatement pstmt2 = conn.prepareStatement(""); stmt2.close(); pstmt2.close(); diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java index c69ed96559..083f53c84c 100644 --- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java +++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java @@ -82,8 +82,12 @@ public class JdbcClientQueryEventHandler implements JdbcQueryEventHandler { /** {@inheritDoc} */ @Override - public CompletableFuture<JdbcBatchExecuteResult> batchAsync(JdbcBatchExecuteRequest req) { - return client.sendRequestAsync(ClientOp.JDBC_EXEC_BATCH, w -> req.writeBinary(w.out()), r -> { + public CompletableFuture<JdbcBatchExecuteResult> batchAsync(long connectionId, JdbcBatchExecuteRequest req) { + return client.sendRequestAsync(ClientOp.JDBC_EXEC_BATCH, w -> { + w.out().packLong(connectionId); + + req.writeBinary(w.out()); + }, r -> { JdbcBatchExecuteResult res = new JdbcBatchExecuteResult(); res.readBinary(r.in()); @@ -94,9 +98,12 @@ public class JdbcClientQueryEventHandler implements JdbcQueryEventHandler { /** {@inheritDoc} */ @Override - public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync( - JdbcBatchPreparedStmntRequest req) { - return client.sendRequestAsync(ClientOp.JDBC_SQL_EXEC_PS_BATCH, w -> req.writeBinary(w.out()), r -> { + public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(long connectionId, JdbcBatchPreparedStmntRequest req) { + return client.sendRequestAsync(ClientOp.JDBC_SQL_EXEC_PS_BATCH, w -> { + w.out().packLong(connectionId); + + req.writeBinary(w.out()); + }, r -> { JdbcBatchExecuteResult res = new JdbcBatchExecuteResult(); res.readBinary(r.in()); diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java index ccf16601f3..246ec5f2a6 100644 --- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java +++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java @@ -115,7 +115,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat = new JdbcBatchPreparedStmntRequest(conn.getSchema(), sql, batchedArgs); try { - JdbcBatchExecuteResult res = conn.handler().batchPrepStatementAsync(req).join(); + JdbcBatchExecuteResult res = conn.handler().batchPrepStatementAsync(conn.connectionId(), req).join(); if (!res.hasResults()) { throw new BatchUpdateException(res.err(), diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java index 2cb6946390..0808ae91ec 100644 --- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java +++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java @@ -550,7 +550,7 @@ public class JdbcStatement implements Statement { JdbcBatchExecuteRequest req = new JdbcBatchExecuteRequest(conn.getSchema(), batch); try { - JdbcBatchExecuteResult res = conn.handler().batchAsync(req).join(); + JdbcBatchExecuteResult res = conn.handler().batchAsync(conn.connectionId(), req).join(); if (!res.hasResults()) { throw new BatchUpdateException(res.err(), diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java index 4c8121d146..f2806ffaa4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java @@ -27,6 +27,11 @@ import org.apache.ignite.Ignite; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.sql.engine.QueryContext; +import org.apache.ignite.internal.sql.engine.QueryProcessor; +import org.apache.ignite.internal.sql.engine.SqlQueryType; +import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; +import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.testframework.WorkDirectory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -194,8 +199,12 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes } protected final List<List<Object>> executeSql(String sql, Object... args) { + QueryProcessor qryProc = node(0).queryEngine(); + SessionId sessionId = qryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + return getAllFromCursor( - node(0).queryEngine().queryAsync("PUBLIC", sql, args).get(0).join() + qryProc.querySingleAsync(sessionId, context, sql, args).join() ); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java index 5cfadb9783..92bacfd50d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java @@ -56,31 +56,6 @@ public interface QueryProcessor extends IgniteComponent { */ List<SessionInfo> liveSessions(); - /** - * Execute the query with given schema name and parameters. - * - * @param schemaName Schema name. - * @param qry Sql query. - * @param params Query parameters. - * @return List of sql cursors. - * - * @throws IgniteException in case of an error. - */ - List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(String schemaName, String qry, Object... params); - - /** - * Execute the query with given schema name and parameters. - * - * @param context User query context. - * @param schemaName Schema name. - * @param qry Sql query. - * @param params Query parameters. - * @return List of sql cursors. - * - * @throws IgniteException in case of an error. - */ - List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(QueryContext context, String schemaName, String qry, Object... params); - /** * Execute the single statement query with given schema name and parameters. * diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 4f4aa60c03..4366c4f63a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -82,7 +82,6 @@ import org.apache.ignite.internal.sql.engine.session.SessionManager; import org.apache.ignite.internal.sql.engine.session.SessionProperty; import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser; import org.apache.ignite.internal.sql.engine.sql.ParseResult; -import org.apache.ignite.internal.sql.engine.sql.ScriptParseResult; import org.apache.ignite.internal.sql.engine.sql.StatementParseResult; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.Commons; @@ -175,7 +174,7 @@ public class SqlQueryProcessor implements QueryProcessor { private final HybridClock clock; /** Distributed catalog manager. */ - private CatalogManager catalogManager; + private final CatalogManager catalogManager; /** Constructor. */ public SqlQueryProcessor( @@ -334,29 +333,6 @@ public class SqlQueryProcessor implements QueryProcessor { IgniteUtils.closeAll(Stream.concat(closableComponents, closableListeners).collect(Collectors.toList())); } - /** {@inheritDoc} */ - @Override - public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(String schemaName, String qry, Object... params) { - QueryContext context = QueryContext.create(SqlQueryType.ALL); - - return queryAsync(context, schemaName, qry, params); - } - - /** {@inheritDoc} */ - @Override - public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(QueryContext context, String schemaName, - String qry, Object... params) { - if (!busyLock.enterBusy()) { - throw new IgniteInternalException(OPERATION_INTERRUPTED_ERR, new NodeStoppingException()); - } - - try { - return query0(context, schemaName, qry, params); - } finally { - busyLock.leaveBusy(); - } - } - /** {@inheritDoc} */ @Override public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync( @@ -502,104 +478,6 @@ public class SqlQueryProcessor implements QueryProcessor { return stage; } - private List<CompletableFuture<AsyncSqlCursor<List<Object>>>> query0( - QueryContext context, - String schemaName, - String sql, - Object... params - ) { - SchemaPlus schema = sqlSchemaManager.schema(schemaName); - - if (schema == null) { - throw new IgniteInternalException(SCHEMA_NOT_FOUND_ERR, format("Schema not found [schemaName={}]", schemaName)); - } - - CompletableFuture<Void> start = new CompletableFuture<>(); - - ScriptParseResult parseResult; - List<CompletableFuture<AsyncSqlCursor<List<Object>>>> res; - - try { - parseResult = IgniteSqlParser.parse(sql, ScriptParseResult.MODE); - res = new ArrayList<>(parseResult.statements().size()); - } catch (Throwable th) { - start.completeExceptionally(th); - - parseResult = new ScriptParseResult(Collections.emptyList(), 0); - res = Collections.singletonList(CompletableFuture.completedFuture(failedCursor(th))); - } - - for (SqlNode sqlNode : parseResult.statements()) { - try { - validateParsedStatement(context, parseResult, sqlNode, params); - } catch (Exception e) { - start.completeExceptionally(e); - - res = Collections.singletonList(CompletableFuture.completedFuture(failedCursor(e))); - return res; - } - - // Only rw transactions for now. - InternalTransaction implicitTx = txManager.begin(false); - - final BaseQueryContext ctx = BaseQueryContext.builder() - .cancel(new QueryCancel()) - .frameworkConfig( - Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) - .defaultSchema(schema) - .build() - ) - .logger(LOG) - .parameters(params) - .plannerTimeout(PLANNER_TIMEOUT) - .build(); - - // TODO https://issues.apache.org/jira/browse/IGNITE-17746 Fix query execution flow. - CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start - .thenCompose(none -> prepareSvc.prepareAsync(sqlNode, ctx)) - .thenApply(plan -> { - SqlQueryType queryType = plan.type(); - assert queryType != null : "Expected a full plan but got a fragment: " + plan; - - return new AsyncSqlCursorImpl<>( - queryType, - plan.metadata(), - implicitTx, - executionSrvc.executePlan(implicitTx, plan, ctx) - ); - }); - - stage.whenComplete((cur, ex) -> { - if (ex instanceof CancellationException) { - ctx.cancel().cancel(); - } - }); - - res.add(stage); - } - - start.completeAsync(() -> null, taskExecutor); - - return res; - } - - private static <T> AsyncSqlCursor<T> failedCursor(Throwable th) { - return new AsyncSqlCursorImpl<>( - null, null, null, - new AsyncCursor<>() { - @Override - public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) { - return CompletableFuture.failedFuture(th); - } - - @Override - public CompletableFuture<Void> closeAsync() { - return CompletableFuture.completedFuture(null); - } - } - ); - } - private abstract static class AbstractTableEventListener implements EventListener<TableEventParameters> { protected final SqlSchemaManagerImpl schemaHolder; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java index cf5811b285..c78501322f 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java @@ -68,6 +68,8 @@ import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException; import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction; import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex; +import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; +import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; @@ -261,19 +263,22 @@ public class StopCalciteModuleTest { await(testRevisionRegister.moveRevision.apply(0L)); - var cursors = qryProc.queryAsync( - "PUBLIC", + SessionId sessionId = qryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + + var cursors = qryProc.querySingleAsync( + sessionId, + context, "SELECT * FROM TEST" ); - await(cursors.get(0).thenCompose(cursor -> cursor.requestNextAsync(1))); + await(cursors.thenCompose(cursor -> cursor.requestNextAsync(1))); assertTrue(isThereNodeThreads(NODE_NAME)); qryProc.stop(); - var request = cursors.get(0) - .thenCompose(cursor -> cursor.requestNextAsync(1)); + var request = cursors.thenCompose(cursor -> cursor.requestNextAsync(1)); // Check cursor closed. await(request.exceptionally(t -> { @@ -286,8 +291,9 @@ public class StopCalciteModuleTest { assertTrue(request.isCompletedExceptionally()); // Check execute query on stopped node. - assertTrue(assertThrows(IgniteInternalException.class, () -> qryProc.queryAsync( - "PUBLIC", + assertTrue(assertThrows(IgniteInternalException.class, () -> qryProc.querySingleAsync( + sessionId, + context, "SELECT 1" )).getCause() instanceof NodeStoppingException); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java index 06253e8b62..feac7aacb3 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java @@ -71,7 +71,11 @@ import org.apache.ignite.internal.schema.configuration.TableView; import org.apache.ignite.internal.schema.configuration.TablesConfiguration; import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult; import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; +import org.apache.ignite.internal.sql.engine.QueryContext; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; +import org.apache.ignite.internal.sql.engine.SqlQueryType; +import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; +import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.DataStorageModules; import org.apache.ignite.internal.storage.impl.TestDataStorageModule; @@ -300,32 +304,37 @@ public class MockedStructuresTest extends IgniteAbstractTest { public void testCreateTable() { SqlQueryProcessor finalQueryProc = queryProc; + SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + String curMethodName = getCurrentMethodName(); String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='zone123'", curMethodName); - readFirst(queryProc.queryAsync("PUBLIC", newTblSql)); + readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); assertTrue(tblManager.tables().stream().anyMatch(t -> t.name() .equalsIgnoreCase(curMethodName))); String finalNewTblSql1 = newTblSql; - assertThrows(TableAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1))); + assertThrows(TableAlreadyExistsException.class, + () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql1))); String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='zone123'", curMethodName); - assertThrows(TableAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2))); + assertThrows(TableAlreadyExistsException.class, + () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql2))); - assertThrows(SqlException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", + assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions__wrong=1,primary_zone='zone123'"))); - assertThrows(SqlException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", + assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas__wrong=1,primary_zone='zone123'"))); - assertThrows(SqlException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", + assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone__wrong='zone123'"))); newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", @@ -333,7 +342,7 @@ public class MockedStructuresTest extends IgniteAbstractTest { String finalNewTblSql3 = newTblSql; - assertDoesNotThrow(() -> await(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql3).get(0))); + assertDoesNotThrow(() -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql3))); } /** @@ -343,16 +352,19 @@ public class MockedStructuresTest extends IgniteAbstractTest { public void testCreateTableWithDistributionZone() { String tableName = getCurrentMethodName().toUpperCase(); + SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + String zoneName = "zone123"; String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) ", tableName); - readFirst(queryProc.queryAsync("PUBLIC", newTblSql)); + readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); assertEquals(DistributionZoneManager.DEFAULT_ZONE_ID, tblsCfg.tables().get(tableName).zoneId().value()); - readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + tableName)); + readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName)); int zoneId = dstZnsCfg.distributionZones().get(zoneName).zoneId().value(); @@ -361,18 +373,18 @@ public class MockedStructuresTest extends IgniteAbstractTest { newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='%s'", tableName, zoneName); - readFirst(queryProc.queryAsync("PUBLIC", newTblSql)); + readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); assertEquals(zoneId, tblsCfg.tables().get(tableName).zoneId().value()); - readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + tableName)); + readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName)); when(distributionZoneManager.getZoneId(zoneName)).thenThrow(DistributionZoneNotFoundException.class); Exception exception = assertThrows( IgniteException.class, - () -> readFirst(queryProc.queryAsync("PUBLIC", + () -> readFirst(queryProc.querySingleAsync(sessionId, context, String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='%s'", tableName, zoneName))) ); @@ -387,26 +399,29 @@ public class MockedStructuresTest extends IgniteAbstractTest { public void testDropTable() { String curMethodName = getCurrentMethodName(); + SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName); - readFirst(queryProc.queryAsync("PUBLIC", newTblSql)); + readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); - readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + curMethodName)); + readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + curMethodName)); SqlQueryProcessor finalQueryProc = queryProc; - assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", + assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, "DROP TABLE " + curMethodName + "_not_exist"))); - assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", + assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, "DROP TABLE " + curMethodName))); - assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", + assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, "DROP TABLE PUBLIC." + curMethodName))); - readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist")); + readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist")); - readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS PUBLIC." + curMethodName)); + readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName)); assertTrue(tblManager.tables().stream().noneMatch(t -> t.name() .equalsIgnoreCase("PUBLIC." + curMethodName))); @@ -416,8 +431,12 @@ public class MockedStructuresTest extends IgniteAbstractTest { void createTableWithTableOptions() { String method = getCurrentMethodName(); - assertDoesNotThrow(() -> readFirst(queryProc.queryAsync( - "PUBLIC", + SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + + assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync( + sessionId, + context, String.format( "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone='zone123'", method + 4 @@ -426,8 +445,9 @@ public class MockedStructuresTest extends IgniteAbstractTest { IgniteException exception = assertThrows( IgniteException.class, - () -> readFirst(queryProc.queryAsync( - "PUBLIC", + () -> readFirst(queryProc.querySingleAsync( + sessionId, + context, String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with %s='%s'", method + 6, method, method) )) ); @@ -439,8 +459,12 @@ public class MockedStructuresTest extends IgniteAbstractTest { void createTableWithDataStorageOptions() { String method = getCurrentMethodName(); - assertDoesNotThrow(() -> readFirst(queryProc.queryAsync( - "PUBLIC", + SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + + assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync( + sessionId, + context, String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with dataRegion='default'", method + 0) ))); @@ -449,10 +473,13 @@ public class MockedStructuresTest extends IgniteAbstractTest { equalTo(DEFAULT_DATA_REGION_NAME) ); - assertDoesNotThrow(() -> readFirst(queryProc.queryAsync( - "PUBLIC", - String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with DATAREGION='test_region'", method + 1) - ))); + assertDoesNotThrow(() -> readFirst( + queryProc.querySingleAsync( + sessionId, + context, + String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with DATAREGION='test_region'", method + 1) + ) + )); assertThat( ((RocksDbDataStorageView) tableView(method + 1).dataStorage()).dataRegion(), @@ -549,8 +576,8 @@ public class MockedStructuresTest extends IgniteAbstractTest { return tableManager; } - private <T> BatchedResult<T> readFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) { - return await(await(cursors.get(0)).requestNextAsync(512)); + private <T> BatchedResult<T> readFirst(CompletableFuture<AsyncSqlCursor<List<Object>>> cursors) { + return (BatchedResult<T>) await(await(cursors).requestNextAsync(512)); } private @Nullable TableView tableView(String tableName) {