This is an automated email from the ASF dual-hosted git repository.
jooger pushed a commit to branch jdbc_over_thin_sql
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/jdbc_over_thin_sql by this
push:
new a071d913306 IGNITE-26087 Ability to obtain results of a
multi-statement query execution using the internal thin client SQL API. (#6397)
a071d913306 is described below
commit a071d91330629b933fea0469ac664323c52b9bb6
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Aug 20 09:41:30 2025 +0300
IGNITE-26087 Ability to obtain results of a multi-statement query execution
using the internal thin client SQL API. (#6397)
---
.../ignite/internal/client/proto/ClientOp.java | 3 +
.../ignite/internal/client/sql/QueryModifier.java | 7 +-
.../client/proto/sql/QueryModifierTest.java | 5 +-
.../handler/ClientInboundMessageHandler.java | 4 +
.../client/handler/ClientResourceRegistry.java | 6 +
.../handler/requests/sql/ClientSqlCommon.java | 152 ++++++
.../sql/ClientSqlCursorNextResultRequest.java | 64 +++
.../requests/sql/ClientSqlExecuteRequest.java | 134 ++----
.../handler/requests/sql/ClientSqlProperties.java | 2 +-
.../handler/requests/sql/ClientSqlCommonTest.java | 2 +-
.../apache/ignite/client/ClientOperationType.java | 5 +
.../org/apache/ignite/client/RetryReadPolicy.java | 1 +
.../apache/ignite/internal/client/ClientUtils.java | 3 +
.../internal/client/sql/ClientAsyncResultSet.java | 68 ++-
.../ignite/internal/client/sql/ClientSql.java | 6 +-
.../org/apache/ignite/client/ClientSqlTest.java | 4 +
.../org/apache/ignite/client/fakes/FakeCursor.java | 5 +-
.../client/ItThinClientMultistatementSqlTest.java | 529 +++++++++++++++++++++
.../runner/app/client/ItThinClientSqlTest.java | 21 +-
.../internal/sql/api/AsyncResultSetImpl.java | 5 +
.../engine/util/SqlExceptionMapperProvider.java | 4 +
21 files changed, 916 insertions(+), 114 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index d0a3f5bb0b5..377745e3f08 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -200,6 +200,9 @@ public class ClientOp {
/** Response to a server->client operation. */
public static final int SERVER_OP_RESPONSE = 73;
+ /** Get next result set. */
+ public static final int SQL_CURSOR_NEXT_RESULT_SET = 74;
+
/** Reserved for extensions: min. */
@SuppressWarnings("unused")
public static final int RESERVED_EXTENSION_RANGE_START = 1000;
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
index 60bfcc28e74..d910549559c 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
@@ -34,13 +34,16 @@ public enum QueryModifier {
ALLOW_APPLIED_RESULT(2),
/** Queries with transaction control statements. */
- ALLOW_TX_CONTROL(3);
+ ALLOW_TX_CONTROL(3),
+
+ /** Queries with multiple statements. */
+ ALLOW_MULTISTATEMENT(4);
/** A set containing all modifiers. **/
public static final Set<QueryModifier> ALL =
EnumSet.allOf(QueryModifier.class);
/** A set of modifiers that can apply to single statements. **/
- public static final Set<QueryModifier> SINGLE_STMT_MODIFIERS =
EnumSet.complementOf(EnumSet.of(ALLOW_TX_CONTROL));
+ public static final Set<QueryModifier> SINGLE_STMT_MODIFIERS =
EnumSet.complementOf(EnumSet.of(ALLOW_TX_CONTROL, ALLOW_MULTISTATEMENT));
private static final QueryModifier[] VALS = new
QueryModifier[values().length];
diff --git
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
index 443f604e5d3..a4821d85333 100644
---
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
+++
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
@@ -58,7 +58,10 @@ public class QueryModifierTest {
Arguments.of(Set.of(QueryModifier.ALLOW_ROW_SET_RESULT,
QueryModifier.ALLOW_AFFECTED_ROWS_RESULT)),
Arguments.of(Set.of(QueryModifier.ALLOW_AFFECTED_ROWS_RESULT,
QueryModifier.ALLOW_APPLIED_RESULT)),
Arguments.of(Set.of(QueryModifier.ALLOW_APPLIED_RESULT,
QueryModifier.ALLOW_TX_CONTROL)),
- Arguments.of(Set.of(QueryModifier.ALLOW_TX_CONTROL,
QueryModifier.ALLOW_ROW_SET_RESULT)),
+ Arguments.of(Set.of(QueryModifier.ALLOW_TX_CONTROL,
QueryModifier.ALLOW_MULTISTATEMENT)),
+ Arguments.of(Set.of(QueryModifier.ALLOW_MULTISTATEMENT,
QueryModifier.ALLOW_ROW_SET_RESULT)),
+ Arguments.of(Set.of(QueryModifier.ALLOW_ROW_SET_RESULT,
QueryModifier.ALLOW_MULTISTATEMENT,
+ QueryModifier.ALLOW_APPLIED_RESULT)),
Arguments.of(QueryModifier.ALL));
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 461f0e247c5..42f40d0b7af 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -80,6 +80,7 @@ import
org.apache.ignite.client.handler.requests.jdbc.ClientJdbcTableMetadataReq
import org.apache.ignite.client.handler.requests.jdbc.JdbcMetadataCatalog;
import
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorCloseRequest;
import
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextPageRequest;
+import
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextResultRequest;
import
org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteBatchRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest;
import
org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteScriptRequest;
@@ -948,6 +949,9 @@ public class ClientInboundMessageHandler
clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT)
);
+ case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
+ return ClientSqlCursorNextResultRequest.process(in, resources,
partitionOperationsExecutor, metrics);
+
case ClientOp.OPERATION_CANCEL:
return ClientOperationCancelRequest.process(in, cancelHandles);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
index ed418d70a2a..23fc40cc33a 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.TestOnly;
/**
* Per-connection resource registry.
@@ -140,6 +141,11 @@ public class ClientResourceRegistry {
}
}
+ @TestOnly
+ public int size() {
+ return res.size();
+ }
+
/**
* Enters the lock.
*/
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index dc251f2bf58..ff3d3e7f6a7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -23,15 +23,28 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientHandlerMetricSource;
+import org.apache.ignite.client.handler.ClientResource;
+import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.sql.QueryModifier;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import
org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata;
+import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnMetadata.ColumnOrigin;
import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
+import org.jetbrains.annotations.Nullable;
/**
* Common SQL request handling logic.
@@ -223,6 +236,9 @@ class ClientSqlCommon {
queryTypes.add(SqlQueryType.TX_CONTROL);
break;
+ case ALLOW_MULTISTATEMENT:
+ break;
+
default:
throw new IllegalArgumentException("Unexpected modifier "
+ queryModifier);
}
@@ -230,4 +246,140 @@ class ClientSqlCommon {
return queryTypes;
}
+
+ static CompletableFuture<ResponseWriter> writeResultSetAsync(
+ ClientResourceRegistry resources,
+ AsyncResultSetImpl asyncResultSet,
+ ClientHandlerMetricSource metrics,
+ int pageSize,
+ boolean includePartitionAwarenessMeta,
+ boolean sqlDirectTxMappingSupported,
+ boolean sqlMultiStatementSupported
+ ) {
+ try {
+ Long nextResultResourceId = sqlMultiStatementSupported &&
asyncResultSet.cursor().hasNextResult()
+ ?
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize,
resources)
+ : null;
+
+ if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages()))
{
+ metrics.cursorsActiveIncrement();
+
+ var clientResultSet = new ClientSqlResultSet(asyncResultSet,
metrics);
+
+ ClientResource resource = new ClientResource(
+ clientResultSet,
+ clientResultSet::closeAsync);
+
+ var resourceId = resources.put(resource);
+
+ return CompletableFuture.completedFuture(out ->
+ writeResultSet(out, asyncResultSet, resourceId,
includePartitionAwarenessMeta,
+ sqlDirectTxMappingSupported,
sqlMultiStatementSupported, nextResultResourceId));
+ }
+
+ return asyncResultSet.closeAsync()
+ .thenApply(v -> (ResponseWriter) out ->
+ writeResultSet(out, asyncResultSet, null,
includePartitionAwarenessMeta,
+ sqlDirectTxMappingSupported,
sqlMultiStatementSupported, nextResultResourceId));
+
+ } catch (IgniteInternalCheckedException e) {
+ // Resource registry was closed.
+ return asyncResultSet
+ .closeAsync()
+ .thenRun(() -> {
+ throw new IgniteInternalException(e.getMessage(), e);
+ });
+ }
+ }
+
+ private static Long saveNextResultResource(
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture,
+ int pageSize,
+ ClientResourceRegistry resources
+ ) throws IgniteInternalCheckedException {
+ ClientResource resource = new ClientResource(
+ new CursorWithPageSize(nextResultFuture, pageSize),
+ () -> nextResultFuture.thenApply(AsyncCursor::closeAsync));
+
+ return resources.put(resource);
+ }
+
+ private static void writeResultSet(
+ ClientMessagePacker out,
+ AsyncResultSetImpl res,
+ @Nullable Long resourceId,
+ boolean includePartitionAwarenessMeta,
+ boolean sqlDirectTxMappingSupported,
+ boolean sqlMultiStatementsSupported,
+ @Nullable Long nextResultResourceId
+ ) {
+ out.packLongNullable(resourceId);
+
+ out.packBoolean(res.hasRowSet());
+ out.packBoolean(res.hasMorePages());
+ out.packBoolean(res.wasApplied());
+ out.packLong(res.affectedRows());
+
+ packMeta(out, res.metadata());
+
+ if (includePartitionAwarenessMeta) {
+ packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(),
sqlDirectTxMappingSupported);
+ }
+
+ if (sqlMultiStatementsSupported) {
+ out.packLongNullable(nextResultResourceId);
+ }
+
+ if (res.hasRowSet()) {
+ packCurrentPage(out, res);
+ }
+ }
+
+ private static void packMeta(ClientMessagePacker out, @Nullable
ResultSetMetadata meta) {
+ // TODO IGNITE-17179 metadata caching - avoid sending same meta over
and over.
+ if (meta == null || meta.columns() == null) {
+ out.packInt(0);
+ return;
+ }
+
+ packColumns(out, meta.columns());
+ }
+
+ private static void packPartitionAwarenessMeta(
+ ClientMessagePacker out,
+ @Nullable PartitionAwarenessMetadata meta,
+ boolean sqlDirectTxMappingSupported
+ ) {
+ if (meta == null) {
+ out.packNil();
+ return;
+ }
+
+ out.packInt(meta.tableId());
+ out.packIntArray(meta.indexes());
+ out.packIntArray(meta.hash());
+
+ if (sqlDirectTxMappingSupported) {
+ out.packByte(meta.directTxMode().id);
+ }
+ }
+
+ /** Holder of the cursor future and page size. */
+ static class CursorWithPageSize {
+ private final CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
cursorFuture;
+ private final int pageSize;
+
+ CursorWithPageSize(CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
cursorFuture, int pageSize) {
+ this.cursorFuture = cursorFuture;
+ this.pageSize = pageSize;
+ }
+
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
+ return cursorFuture;
+ }
+
+ int pageSize() {
+ return pageSize;
+ }
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
new file mode 100644
index 00000000000..5e8752fae65
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.sql;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.client.handler.ClientHandlerMetricSource;
+import org.apache.ignite.client.handler.ClientResource;
+import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.client.handler.ResponseWriter;
+import
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.CursorWithPageSize;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.sql.SqlRow;
+
+/**
+ * Client SQL cursor next result.
+ */
+public class ClientSqlCursorNextResultRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @return Future representing result of operation.
+ */
+ public static CompletableFuture<ResponseWriter> process(
+ ClientMessageUnpacker in,
+ ClientResourceRegistry resources,
+ Executor operationExecutor,
+ ClientHandlerMetricSource metrics
+ ) throws IgniteInternalCheckedException {
+ long resourceId = in.unpackLong();
+ ClientResource resource = resources.remove(resourceId);
+ CursorWithPageSize cursorWithPageSize =
resource.get(CursorWithPageSize.class);
+ int pageSize = cursorWithPageSize.pageSize();
+
+ return cursorWithPageSize.cursorFuture()
+ .thenComposeAsync(cur -> cur.requestNextAsync(pageSize)
+ .thenApply(batchRes -> new AsyncResultSetImpl<SqlRow>(
+ cur,
+ batchRes,
+ pageSize
+ )
+ ).thenCompose(asyncResultSet ->
+ ClientSqlCommon.writeResultSetAsync(resources,
asyncResultSet, metrics, pageSize, false, false, true)
+ ).thenApply(rsWriter -> rsWriter), operationExecutor);
+ }
+}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 89bfbd7bc07..758bc01a494 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -17,39 +17,38 @@
package org.apache.ignite.client.handler.requests.sql;
-import static
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.packCurrentPage;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
+import java.util.function.Function;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
-import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
-import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlProperties;
-import
org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
-import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -57,7 +56,6 @@ import org.jetbrains.annotations.Nullable;
/**
* Client SQL execute request.
*/
-@SuppressWarnings({"rawtypes", "unchecked"})
public class ClientSqlExecuteRequest {
/**
* Processes the request.
@@ -128,7 +126,8 @@ public class ClientSqlExecuteRequest {
() -> cancelHandles.remove(requestId),
arguments
).thenCompose(asyncResultSet ->
- writeResultSetAsync(resources, asyncResultSet,
metrics, includePartitionAwarenessMeta, sqlDirectTxMappingSupported))
+ ClientSqlCommon.writeResultSetAsync(resources,
asyncResultSet, metrics, props.pageSize(),
+ includePartitionAwarenessMeta,
sqlDirectTxMappingSupported, sqlMultistatementsSupported))
.thenApply(rsWriter -> out -> {
if (tx != null) {
writeTxMeta(out, timestampTracker, clockService, tx,
resIdHolder[0]);
@@ -146,95 +145,6 @@ public class ClientSqlExecuteRequest {
return arguments == null ? ArrayUtils.OBJECT_EMPTY_ARRAY : arguments;
}
- private static CompletableFuture<ResponseWriter> writeResultSetAsync(
- ClientResourceRegistry resources,
- AsyncResultSetImpl asyncResultSet,
- ClientHandlerMetricSource metrics,
- boolean includePartitionAwarenessMeta,
- boolean sqlDirectTxMappingSupported
- ) {
- if (asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages()) {
- try {
- metrics.cursorsActiveIncrement();
-
- var clientResultSet = new ClientSqlResultSet(asyncResultSet,
metrics);
-
- ClientResource resource = new ClientResource(
- clientResultSet,
- clientResultSet::closeAsync);
-
- var resourceId = resources.put(resource);
-
- return CompletableFuture.completedFuture(out ->
- writeResultSet(out, asyncResultSet, resourceId,
includePartitionAwarenessMeta, sqlDirectTxMappingSupported));
- } catch (IgniteInternalCheckedException e) {
- return asyncResultSet
- .closeAsync()
- .thenRun(() -> {
- throw new IgniteInternalException(e.getMessage(),
e);
- });
- }
- }
-
- return asyncResultSet.closeAsync()
- .thenApply(v -> (ResponseWriter) out ->
- writeResultSet(out, asyncResultSet, null,
includePartitionAwarenessMeta, sqlDirectTxMappingSupported));
- }
-
- private static void writeResultSet(
- ClientMessagePacker out,
- AsyncResultSetImpl res,
- @Nullable Long resourceId,
- boolean includePartitionAwarenessMeta,
- boolean sqlDirectTxMappingSupported
- ) {
- out.packLongNullable(resourceId);
-
- out.packBoolean(res.hasRowSet());
- out.packBoolean(res.hasMorePages());
- out.packBoolean(res.wasApplied());
- out.packLong(res.affectedRows());
-
- packMeta(out, res.metadata());
-
- if (includePartitionAwarenessMeta) {
- packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(),
sqlDirectTxMappingSupported);
- }
-
- if (res.hasRowSet()) {
- packCurrentPage(out, res);
- }
- }
-
- private static void packMeta(ClientMessagePacker out, @Nullable
ResultSetMetadata meta) {
- // TODO IGNITE-17179 metadata caching - avoid sending same meta over
and over.
- if (meta == null || meta.columns() == null) {
- out.packInt(0);
- return;
- }
-
- ClientSqlCommon.packColumns(out, meta.columns());
- }
-
- private static void packPartitionAwarenessMeta(
- ClientMessagePacker out,
- @Nullable PartitionAwarenessMetadata meta,
- boolean sqlDirectTxMappingSupported
- ) {
- if (meta == null) {
- out.packNil();
- return;
- }
-
- out.packInt(meta.tableId());
- out.packIntArray(meta.indexes());
- out.packIntArray(meta.hash());
-
- if (sqlDirectTxMappingSupported) {
- out.packByte(meta.directTxMode().id);
- }
- }
-
private static CompletableFuture<AsyncResultSetImpl<SqlRow>> executeAsync(
@Nullable Transaction transaction,
QueryProcessor qryProc,
@@ -256,7 +166,7 @@ public class ClientSqlExecuteRequest {
arguments
)
.thenCompose(cur -> {
- cur.onClose().whenComplete((none, ignore) ->
onComplete.run());
+ doWhenAllCursorsComplete(cur, onComplete);
return cur.requestNextAsync(pageSize)
.thenApply(
@@ -280,4 +190,30 @@ public class ClientSqlExecuteRequest {
return CompletableFuture.failedFuture(mapToPublicSqlException(e));
}
}
+
+ private static void
doWhenAllCursorsComplete(AsyncSqlCursor<InternalSqlRow> cursor, Runnable
action) {
+ List<CompletableFuture<?>> dependency = new ArrayList<>();
+ var cursorChainTraverser = new Function<AsyncSqlCursor<?>,
CompletableFuture<AsyncSqlCursor<?>>>() {
+ @Override
+ public CompletableFuture<AsyncSqlCursor<?>>
apply(AsyncSqlCursor<?> cursor) {
+ dependency.add(cursor.onClose());
+
+ if (cursor.hasNextResult()) {
+ return cursor.nextResult().thenCompose(this);
+ }
+
+ return allOf(dependency)
+ .thenRun(action)
+ .thenApply(ignored -> cursor);
+ }
+ };
+
+ cursorChainTraverser
+ .apply(cursor)
+ .exceptionally(ex -> {
+ action.run();
+
+ return null;
+ });
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
index 2931dd315eb..23f010e94ad 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
@@ -75,7 +75,7 @@ class ClientSqlProperties {
SqlProperties sqlProperties = new SqlProperties()
.queryTimeout(queryTimeout)
.allowedQueryTypes(ClientSqlCommon.convertQueryModifierToQueryType(queryModifiers))
- .allowMultiStatement(false);
+
.allowMultiStatement(queryModifiers.contains(QueryModifier.ALLOW_MULTISTATEMENT));
if (schema != null) {
sqlProperties.defaultSchema(schema);
diff --git
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
index da7041d4baa..c18556a5135 100644
---
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
+++
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
@@ -43,7 +43,7 @@ public class ClientSqlCommonTest {
void testConvertQueryModifierToQueryType(QueryModifier type) {
Set<SqlQueryType> sqlQueryTypes =
ClientSqlCommon.convertQueryModifierToQueryType(Set.of(type));
- assertFalse(sqlQueryTypes.isEmpty());
+ assertThat(sqlQueryTypes.isEmpty(), is(type ==
QueryModifier.ALLOW_MULTISTATEMENT));
sqlQueryTypes.forEach(sqlQueryType -> {
switch (type) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index b2a2d2f0820..f0a9f13f20a 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -175,6 +175,11 @@ public enum ClientOperationType {
*/
SQL_CURSOR_NEXT_PAGE,
+ /**
+ * SQL Cursor Next ResultSet.
+ */
+ SQL_CURSOR_NEXT_RESULT_SET,
+
/**
* Send streamer batch ({@link DataStreamerTarget#streamData}).
*/
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
index d6c20a7855c..44e0373106a 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -59,6 +59,7 @@ public class RetryReadPolicy extends RetryLimitPolicy {
case SQL_EXECUTE:
case SQL_EXECUTE_BATCH:
case SQL_CURSOR_NEXT_PAGE:
+ case SQL_CURSOR_NEXT_RESULT_SET:
case SQL_EXECUTE_SCRIPT:
case STREAMER_BATCH_SEND:
case PRIMARY_REPLICAS_GET:
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index ad969d2d3d5..f8661224bc1 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -175,6 +175,9 @@ public class ClientUtils {
case ClientOp.SQL_CURSOR_NEXT_PAGE:
return ClientOperationType.SQL_CURSOR_NEXT_PAGE;
+ case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
+ return ClientOperationType.SQL_CURSOR_NEXT_RESULT_SET;
+
case ClientOp.SQL_CURSOR_CLOSE:
return null;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
index 82f1efaba4b..97d1cfd09fe 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
@@ -22,7 +22,9 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -46,7 +48,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Client async result set.
*/
-class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
+public class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
/** Channel. */
private final ClientChannel ch;
@@ -84,6 +86,17 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
/** Closed flag. */
private volatile boolean closed;
+ /** ID of the resource that holds the next cursor, can be {@code null} if
current result set is the last one. */
+ @Nullable
+ private final Long nextResultResourceId;
+
+ /** Future that holds the next result set, can be {@code null} if the
current result set is the last one. */
+ @Nullable
+ private final CompletableFuture<ClientAsyncResultSet<T>> nextResultFuture;
+
+ /** A flag indicating whether the next result set already was requested or
not. */
+ private final AtomicBoolean nextResultSetRetrieved = new AtomicBoolean();
+
/**
* Constructor.
*
@@ -93,6 +106,7 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
* @param mapper Mapper.
* @param partitionAwarenessEnabled Whether partitions awareness is
enabled, hence response may contain related metadata.
* @param sqlDirectMappingSupported Whether direct mapping is supported,
hence response may contain additional metadata.
+ * @param sqlMultiStatementsSupported Whether iteration over the results
of script execution is supported.
*/
ClientAsyncResultSet(
ClientChannel ch,
@@ -100,7 +114,8 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
ClientMessageUnpacker in,
@Nullable Mapper<T> mapper,
boolean partitionAwarenessEnabled,
- boolean sqlDirectMappingSupported
+ boolean sqlDirectMappingSupported,
+ boolean sqlMultiStatementsSupported
) {
this.ch = ch;
@@ -117,6 +132,14 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T>
{
partitionAwarenessMetadata = null;
}
+ if (sqlMultiStatementsSupported && !in.tryUnpackNil()) {
+ nextResultResourceId = in.unpackLong();
+ nextResultFuture = new CompletableFuture<>();
+ } else {
+ nextResultResourceId = null;
+ nextResultFuture = null;
+ }
+
this.mapper = mapper;
marshaller = metadata != null && mapper != null && mapper.targetType()
!= SqlRow.class
? marshaller(metadata, marshallers, mapper)
@@ -139,6 +162,47 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T>
{
return hasRowSet;
}
+ /**
+ * Returns flag indicating whether the current result set is the result of
+ * a multi-statement query and this statement is not the last one.
+ */
+ public boolean hasNextResultSet() {
+ return nextResultResourceId != null;
+ }
+
+ /**
+ * Retrieves the next result set of a multi-statement query.
+ *
+ * @return Next result set.
+ * @throws NoSuchElementException if the query has no more statements to
execute.
+ */
+ public CompletableFuture<ClientAsyncResultSet<T>> nextResultSet() {
+ if (nextResultResourceId == null) {
+ return CompletableFuture.failedFuture(new
NoSuchElementException("Query has no more results"));
+ }
+
+ assert nextResultFuture != null;
+
+ if (!nextResultSetRetrieved.compareAndSet(false, true)) {
+ return nextResultFuture;
+ }
+
+
ch.<ClientAsyncResultSet<T>>serviceAsync(ClientOp.SQL_CURSOR_NEXT_RESULT_SET,
+ w -> w.out().packLong(nextResultResourceId),
+ r -> new ClientAsyncResultSet<>(
+ r.clientChannel(), null, r.in(), null, false,
false, true
+ ))
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ nextResultFuture.completeExceptionally(e);
+ } else {
+ nextResultFuture.complete(r);
+ }
+ });
+
+ return nextResultFuture;
+ }
+
/** {@inheritDoc} */
@Override
public long affectedRows() {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 9d9e01f875b..568c0df0300 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -321,6 +321,9 @@ public class ClientSql implements IgniteSql {
Statement statement,
@Nullable Object... arguments
) {
+ assert mapper == null || mapper.targetType() == SqlRow.class
+ ||
!queryModifiers.contains(QueryModifier.ALLOW_MULTISTATEMENT) : "Mapper is not
supported for multi-statements.";
+
Objects.requireNonNull(statement);
PartitionMappingProvider mappingProvider =
mappingProviderCache.getIfPresent(new PaCacheKey(statement));
@@ -373,10 +376,11 @@ public class ClientSql implements IgniteSql {
&&
r.clientChannel().protocolContext().isFeatureSupported(SQL_PARTITION_AWARENESS);
boolean sqlDirectMappingSupported =
r.clientChannel().protocolContext().isFeatureSupported(SQL_DIRECT_TX_MAPPING);
+ boolean sqlMultistatementsSupported =
r.clientChannel().protocolContext().allFeaturesSupported(SQL_MULTISTATEMENT_SUPPORT);
DirectTxUtils.readTx(r, ctx, tx, ch.observableTimestamp());
ClientAsyncResultSet<T> rs = new ClientAsyncResultSet<>(
- r.clientChannel(), marshallers, r.in(), mapper,
tryUnpackPaMeta, sqlDirectMappingSupported
+ r.clientChannel(), marshallers, r.in(), mapper,
tryUnpackPaMeta, sqlDirectMappingSupported, sqlMultistatementsSupported
);
ClientPartitionAwarenessMetadata partitionAwarenessMetadata =
rs.partitionAwarenessMetadata();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
index 682cab4bfdc..8411c929f86 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
@@ -307,6 +307,10 @@ public class ClientSqlTest extends AbstractClientTableTest
{
expected = "TX_CONTROL";
break;
+ case ALLOW_MULTISTATEMENT:
+ expected = "MULTISTATEMENT";
+ break;
+
default:
throw new IllegalArgumentException("Unexpected type: " +
modifier);
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index 51759bb99ce..9372dc85e0d 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlProperties;
@@ -121,8 +122,8 @@ public class FakeCursor implements
AsyncSqlCursor<InternalSqlRow> {
columns.add(new FakeColumnMetadata("col1", ColumnType.INT32));
} else if ("SELECT ALLOWED QUERY TYPES".equals(qry)) {
paMeta = null;
- String row =
properties.allowedQueryTypes().stream().map(SqlQueryType::name).sorted()
- .collect(Collectors.joining(", "));
+ String row =
Stream.concat(properties.allowedQueryTypes().stream().map(SqlQueryType::name).sorted(),
+ properties.allowMultiStatement() ?
Stream.of("MULTISTATEMENT") : Stream.empty()).collect(Collectors.joining(", "));
rows.add(getRow(row));
columns.add(new FakeColumnMetadata("col1", ColumnType.STRING));
} else {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java
new file mode 100644
index 00000000000..bfb3ab34645
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.internal.client.sql.ClientAsyncResultSet;
+import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.sql.QueryModifier;
+import org.apache.ignite.internal.sql.SyncResultSetAdapter;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.awaitility.Awaitility;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Thin client SQL multi-statement integration test.
+ *
+ * <p>Tests to check internal API for reading script execution results.
+ */
+@SuppressWarnings("resource")
+public class ItThinClientMultistatementSqlTest extends
ItAbstractThinClientTest {
+ private final List<ClientAsyncResultSet<SqlRow>> resultsToClose = new
ArrayList<>();
+
+ private int resourcesBefore;
+
+ @BeforeEach
+ void setup() {
+ resultsToClose.forEach(resultSet -> await(resultSet.closeAsync()));
+
+ resourcesBefore = countResources();
+ }
+
+ @AfterEach
+ protected void checkNoPendingTransactionsAndOpenedCursors() {
+ Awaitility.await().timeout(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ for (int i = 0; i < nodes(); i++) {
+ assertThat("node=" + i, queryProcessor(i).openedCursors(),
is(0));
+ }
+
+ for (int i = 0; i < nodes(); i++) {
+ assertThat("node=" + i, txManager(i).pending(), is(0));
+ }
+
+ assertThat(countResources() - resourcesBefore, is(0));
+
+ for (int i = 0; i < nodes(); i++) {
+ int cancelHandlesCount =
unwrapIgniteImpl(server(i)).clientInboundMessageHandler().cancelHandlesCount();
+
+ assertThat("node=" + i, cancelHandlesCount, is(0));
+ }
+ });
+
+ String dropTablesScript = client().tables().tables().stream()
+ .map(Table::name)
+ .map(name -> "DROP TABLE " + name)
+ .collect(Collectors.joining(";\n"));
+
+ if (!dropTablesScript.isEmpty()) {
+ client().sql().executeScript(dropTablesScript);
+ }
+ }
+
+ /** Ensures that we can get the next result set after the current one is
closed. */
+ @Test
+ void checkIterationOverResultSets() {
+ ClientAsyncResultSet<SqlRow> asyncRs = runSql("SELECT 1; SELECT 2;
SELECT 3;");
+
+ // First result set.
+ {
+ SyncResultSetAdapter<SqlRow> rs = new
SyncResultSetAdapter<>(asyncRs);
+ assertThat(rs.hasRowSet(), is(true));
+ assertThat(rs.next().intValue(0), is(1));
+
+ rs.close();
+ }
+
+ assertThat(asyncRs.hasNextResultSet(), is(true));
+
+ CompletableFuture<ClientAsyncResultSet<SqlRow>> nextResultFut =
asyncRs.nextResultSet();
+ // Ensures that the next result is cached locally
+ // and subsequent calls do not request data from the server.
+ assertThat(nextResultFut, is(asyncRs.nextResultSet()));
+
+ asyncRs = await(asyncRs.nextResultSet());
+
+ // Second result set.
+ {
+ SyncResultSetAdapter<SqlRow> rs = new
SyncResultSetAdapter<>(asyncRs);
+ assertThat(rs.hasRowSet(), is(true));
+ assertThat(rs.next().intValue(0), is(2));
+
+ rs.close();
+ }
+
+ assertThat(asyncRs.hasNextResultSet(), is(true));
+ // Ensures that the next result is cached locally
+ // and subsequent calls do not request data from the server.
+ assertThat(asyncRs.nextResultSet(), is(asyncRs.nextResultSet()));
+ asyncRs = await(asyncRs.nextResultSet());
+
+ // Second result set.
+ {
+ SyncResultSetAdapter<SqlRow> rs = new
SyncResultSetAdapter<>(asyncRs);
+ assertThat(rs.hasRowSet(), is(true));
+ assertThat(rs.next().intValue(0), is(3));
+
+ rs.close();
+ }
+
+ assertThat(asyncRs.hasNextResultSet(), is(false));
+ }
+
+ @Test
+ void basicMultiStatementQuery() {
+ String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+ + "INSERT INTO test VALUES (3, 3);"
+ + "UPDATE test SET val=7 WHERE id=3;"
+ + "EXPLAIN PLAN FOR SELECT * FROM test;"
+ + "SELECT * FROM test;"
+ + "DELETE FROM test;"
+ + "DROP TABLE IF EXISTS non_existing_table;";
+
+ List<ClientAsyncResultSet<SqlRow>> resultSets =
fetchAllResults(runSql(sql));
+ Iterator<ClientAsyncResultSet<SqlRow>> curItr = resultSets.iterator();
+
+ ResultValidator.ddl(curItr.next(), true);
+ ResultValidator.dml(curItr.next(), 1);
+ ResultValidator.dml(curItr.next(), 1);
+ assertNotNull(curItr.next()); // skip EXPLAIN.
+ ResultValidator.singleRow(curItr.next(), 3, 7);
+ ResultValidator.dml(curItr.next(), 1);
+ ResultValidator.ddl(curItr.next(), false);
+
+ assertThat(curItr.hasNext(), is(false));
+
+ resultSets.forEach(AsyncResultSet::closeAsync);
+
+ // Ensures that the script is executed completely, even if the cursor
data has not been read.
+ executeSql("INSERT INTO test VALUES (1, 1);"
+ + "INSERT INTO test VALUES (2, 2);"
+ + "SELECT * FROM test;"
+ + "INSERT INTO test VALUES (3, 3);");
+
+ expectRowsCount(null, "test", 3);
+ }
+
+ @Test
+ public void txControlStatement() {
+ String query = "START TRANSACTION; COMMIT";
+
+ // Execution of the TX_CONTROL statement is allowed.
+ {
+ EnumSet<QueryModifier> modifiers = EnumSet.of(
+ QueryModifier.ALLOW_TX_CONTROL,
+ QueryModifier.ALLOW_MULTISTATEMENT
+ );
+
+ ClientAsyncResultSet<SqlRow> startFuture = runSql((Transaction)
null, null, modifiers, query);
+ List<ClientAsyncResultSet<SqlRow>> resultSets =
fetchAllResults(startFuture);
+
+ assertThat(resultSets, hasSize(2));
+
+ ClientAsyncResultSet<SqlRow> rs = resultSets.get(0);
+ assertThat(rs.hasNextResultSet(), is(true));
+ assertThat(rs.hasRowSet(), is(false));
+ assertThat(rs.wasApplied(), is(false));
+ assertThat(rs.affectedRows(), is(-1L));
+
+ rs = resultSets.get(1);
+ assertThat(rs.hasNextResultSet(), is(false));
+ assertThat(rs.hasRowSet(), is(false));
+ assertThat(rs.wasApplied(), is(false));
+ assertThat(rs.affectedRows(), is(-1L));
+ }
+
+ // Execution of the TX_CONTROL statement is not allowed.
+ {
+ EnumSet<QueryModifier> modifiers = EnumSet.of(
+ QueryModifier.ALLOW_ROW_SET_RESULT,
+ QueryModifier.ALLOW_MULTISTATEMENT
+ );
+
+ assertThrowsSqlException(
+ STMT_VALIDATION_ERR,
+ "Invalid SQL statement type.",
+ () -> runSql((Transaction) null, null, modifiers, query)
+ );
+ }
+ }
+
+ @Test
+ void throwsNoSuchElementExceptionIfNoMoreResults() {
+ ClientAsyncResultSet<SqlRow> resulSet = runSql("SELECT 1");
+
+ assertThat(resulSet.hasNextResultSet(), is(false));
+
+ //noinspection ThrowableNotThrown
+ assertThrows(NoSuchElementException.class, () ->
await(resulSet.nextResultSet()), "Query has no more results");
+ }
+
+ @Test
+ void queryWithDynamicParameters() {
+ String sql = "CREATE TABLE test (id INT PRIMARY KEY, val VARCHAR
DEFAULT '3');"
+ + "INSERT INTO test VALUES(?, ?);"
+ + "INSERT INTO test VALUES(?, DEFAULT);"
+ + "INSERT INTO test VALUES(?, ?);";
+
+ executeSql(sql, 0, "1", 2, 4, "5");
+ expectRowsCount(null, "test", 3);
+ }
+
+ @Test
+ void explicitTransaction() {
+ executeSql("CREATE TABLE test (id INT PRIMARY KEY);");
+
+ Transaction tx = client().transactions().begin();
+ executeSql(tx, "INSERT INTO test VALUES (0); INSERT INTO test VALUES
(1); INSERT INTO test VALUES (2)");
+
+ expectRowsCount(tx, "test", 3);
+ expectRowsCount(null, "test", 0);
+
+ tx.commit();
+
+ expectRowsCount(null, "test", 3);
+ }
+
+ @Test
+ void queryWithIncorrectNumberOfDynamicParametersFailsWithValidationError()
{
+ String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+ + "INSERT INTO test VALUES(?, ?);"
+ + "INSERT INTO test VALUES(?, ?);";
+
+ String expectedMessage = "Unexpected number of query parameters";
+
+ assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () ->
runSql(sql, 0));
+ assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () ->
runSql(sql, 0, 1, 2, 3, 4, 5));
+ }
+
+ @Test
+ void scriptStopsExecutionOnError() {
+ // Runtime error.
+ {
+ assertThrowsSqlException(
+ RUNTIME_ERR,
+ "Division by zero",
+ () -> executeSql(
+ "CREATE TABLE test (id INT PRIMARY KEY);"
+ + "INSERT INTO test VALUES (2/0);" //
Runtime error.
+ + "INSERT INTO test VALUES (0)"
+ )
+ );
+
+ expectRowsCount(null, "test", 0);
+ }
+
+ // Validation error.
+ {
+ assertThrowsSqlException(
+ STMT_VALIDATION_ERR,
+ "Values passed to VALUES operator must have compatible
types",
+ () -> executeSql(
+ "INSERT INTO test VALUES (?), (?)",
+ "1", 2
+ )
+ );
+
+ expectRowsCount(null, "test", 0);
+ }
+
+ // Internal error.
+ {
+ assertThrowsSqlException(
+ RUNTIME_ERR,
+ "Subquery returned more than 1 value",
+ () -> executeSql(
+ "INSERT INTO test VALUES(0);"
+ + "INSERT INTO test VALUES(1);"
+ + "DELETE FROM test WHERE id = (SELECT id
FROM test);" // Internal error.
+ + "INSERT INTO test VALUES(2);"
+ )
+ );
+
+ expectRowsCount(null, "test", 2);
+ }
+
+ // Same as above, but inside script managed transaction.
+ {
+ assertThrowsSqlException(
+ RUNTIME_ERR,
+ "Subquery returned more than 1 value",
+ () -> executeSql(
+ "START TRANSACTION;"
+ + "INSERT INTO test VALUES(2);"
+ + "INSERT INTO test VALUES(3);"
+ + "DELETE FROM test WHERE id = (SELECT id
FROM test);" // Internal error.
+ + "INSERT INTO test VALUES(4);"
+ + "COMMIT;"
+ )
+ );
+
+ expectRowsCount(null, "test", 2);
+ }
+
+ // Attempt to start script-managed transaction inside external
transaction.
+ {
+ Transaction tx = client().transactions().begin();
+
+ assertThrowsSqlException(
+ TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
+ "Transaction control statement cannot be executed within
an external transaction.",
+ () -> executeSql(
+ tx,
+ "START TRANSACTION; COMMIT;"
+ )
+ );
+
+ tx.rollback();
+ }
+
+ // Internal error due to transaction exception.
+ {
+ Transaction tx = client().transactions().begin();
+ client().sql().execute(tx, "INSERT INTO test VALUES(2);").close();
+ tx.commit();
+
+ assertThrowsSqlException(
+ TX_ALREADY_FINISHED_ERR,
+ "Transaction is already finished",
+ () -> executeSql(
+ tx,
+ "INSERT INTO test VALUES(3); INSERT INTO test
VALUES(4);"
+ )
+ );
+
+ expectRowsCount(null, "test", 3);
+ }
+ }
+
+ @Test
+ public void cancelScript() {
+ StringBuilder query = new StringBuilder();
+
+ int statementsCount = 100;
+
+ for (int j = 0; j < statementsCount; j++) {
+ query.append("SELECT x FROM TABLE(SYSTEM_RANGE(0,
100))").append(";");
+ }
+
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ List<ClientAsyncResultSet<SqlRow>> allResults =
fetchAllResults(runSql((Transaction) null, token, null, query.toString()));
+
+ assertThat(allResults, hasSize(statementsCount));
+
+ cancelHandle.cancel();
+
+ allResults.forEach(rs -> {
+ expectQueryCancelled(() -> {
+ AsyncResultSet<SqlRow> res;
+ do {
+ res = await(rs.fetchNextPage());
+ } while (res.hasMorePages());
+ });
+
+ rs.closeAsync();
+ });
+ }
+
+ private void expectRowsCount(@Nullable Transaction tx, String table, long
expectedCount) {
+ try (ResultSet<SqlRow> rs = client().sql().execute(tx, "SELECT
COUNT(*) FROM " + table)) {
+ assertThat(rs.next().longValue(0), is(expectedCount));
+ }
+ }
+
+ private SqlQueryProcessor queryProcessor(int idx) {
+ return ((SqlQueryProcessor)
unwrapIgniteImpl(server(idx)).queryEngine());
+ }
+
+ private TxManager txManager(int idx) {
+ return unwrapIgniteImpl(server(idx)).txManager();
+ }
+
+ private int countResources() {
+ int count = 0;
+
+ for (int i = 0; i < nodes(); i++) {
+ ClientResourceRegistry resources =
unwrapIgniteImpl(server(i)).clientInboundMessageHandler().resources();
+
+ count += resources.size();
+ }
+
+ return count;
+ }
+
+ private ClientAsyncResultSet<SqlRow> runSql(String query, Object ... args)
{
+ return runSql(null, null, null, query, args);
+ }
+
+ private ClientAsyncResultSet<SqlRow> runSql(
+ @Nullable Transaction tx,
+ @Nullable CancellationToken cancelToken,
+ @Nullable Set<QueryModifier> queryModifiers,
+ String query,
+ Object... args
+ ) {
+ ClientSql clientSql = (ClientSql) client().sql();
+ Statement stmt =
clientSql.statementBuilder().query(query).pageSize(1).build();
+
+ return (ClientAsyncResultSet<SqlRow>) await(
+ clientSql.executeAsyncInternal(
+ tx,
+ (Mapper<SqlRow>) null,
+ cancelToken,
+ queryModifiers == null ? QueryModifier.ALL :
queryModifiers,
+ stmt,
+ args
+ )
+ );
+ }
+
+ private void executeSql(String sql, Object... args) {
+ executeSql(null, sql, args);
+ }
+
+ private void executeSql(@Nullable Transaction tx, String sql, Object ...
args) {
+ fetchAllResults(runSql(tx, null, null, sql, args))
+ .forEach(rs -> await(rs.closeAsync()));
+ }
+
+ private List<ClientAsyncResultSet<SqlRow>>
fetchAllResults(ClientAsyncResultSet<SqlRow> resultSet) {
+ List<ClientAsyncResultSet<SqlRow>> resultSets = new ArrayList<>();
+
+ resultSets.add(resultSet);
+ resultsToClose.add(resultSet);
+
+ ClientAsyncResultSet<SqlRow> resultSet0 = resultSet;
+
+ while (resultSet0.hasNextResultSet()) {
+ resultSet0 = await(resultSet0.nextResultSet());
+
+ assertNotNull(resultSet0);
+
+ resultSets.add(resultSet0);
+
+ resultsToClose.add(resultSet0);
+ }
+
+ return resultSets;
+ }
+
+ private static class ResultValidator {
+ static void dml(ClientAsyncResultSet<SqlRow> resultSet, long
affectedRows) {
+ assertThat(resultSet.hasRowSet(), is(false));
+ assertThat(resultSet.affectedRows(), is(affectedRows));
+ }
+
+ private static void ddl(ClientAsyncResultSet<SqlRow> resultSet,
boolean wasApplied) {
+ assertThat(resultSet.hasRowSet(), is(false));
+ assertThat(resultSet.wasApplied(), is(wasApplied));
+ }
+
+ private static void singleRow(ClientAsyncResultSet<SqlRow> resultSet,
Object... expected) {
+ assertThat(resultSet.hasRowSet(), is(true));
+ Iterator<SqlRow> pageIter = resultSet.currentPage().iterator();
+ SqlRow row = pageIter.next();
+ int rowSize = row.metadata().columns().size();
+ List<Object> actual = new ArrayList<>(rowSize);
+
+ for (int i = 0; i < rowSize; i++) {
+ actual.add(row.value(i));
+ }
+
+ assertThat(List.of(expected), equalTo(actual));
+ assertThat(pageIter.hasNext(), is(false));
+ }
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index 7f63ace6266..a9ae5856c41 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -760,11 +760,12 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
Statement ddlStatement = client().sql().createStatement("CREATE TABLE
x(id INT PRIMARY KEY)");
Statement dmlStatement = client().sql().createStatement("INSERT INTO x
VALUES (1), (2), (3)");
Statement selectStatement = client().sql().createStatement("SELECT *
FROM x");
+ Statement multiStatement = client().sql().createStatement("SELECT 1;
SELECT 2;");
BiConsumer<Statement, Set<QueryModifier>> check = (stmt, types) -> {
await(sql.executeAsyncInternal(
null,
- () -> SqlRow.class,
+ null,
null,
types,
stmt
@@ -816,10 +817,20 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
);
}
- // No exception expected with correct modifiers.
- check.accept(ddlStatement, QueryModifier.ALL);
- check.accept(dmlStatement, QueryModifier.ALL);
- check.accept(selectStatement, QueryModifier.ALL);
+ // Incorrect modifier for multi-statement.
+ {
+ IgniteTestUtils.assertThrows(
+ SqlException.class,
+ () -> check.accept(multiStatement,
QueryModifier.SINGLE_STMT_MODIFIERS),
+ "Multiple statements are not allowed."
+ );
+ }
+
+ // No exception expected with correct query modifier.
+ check.accept(ddlStatement, QueryModifier.SINGLE_STMT_MODIFIERS);
+ check.accept(dmlStatement, QueryModifier.SINGLE_STMT_MODIFIERS);
+ check.accept(selectStatement, QueryModifier.SINGLE_STMT_MODIFIERS);
+ check.accept(multiStatement, QueryModifier.ALL);
}
private static class Pojo {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index e9a12b3ffa9..2112fad417c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -77,6 +77,11 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
return cursor.partitionAwarenessMetadata();
}
+ /** Returns query cursor. */
+ public AsyncSqlCursor<InternalSqlRow> cursor() {
+ return cursor;
+ }
+
/** {@inheritDoc} */
@Override
public boolean hasRowSet() {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
index 1508984a20f..c47ef5d4419 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.lang.IgniteExceptionMapper;
import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import
org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException;
import
org.apache.ignite.internal.sql.engine.exec.RemoteFragmentExecutionException;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.IgniteException;
@@ -76,6 +77,9 @@ public class SqlExceptionMapperProvider implements
IgniteExceptionMappersProvide
mappers.add(unchecked(InternalCompilerException.class,
err -> new SqlException(Common.INTERNAL_ERR, "Expression
compiler error. " + err.getMessage(), err)));
+
mappers.add(unchecked(TxControlInsideExternalTxNotSupportedException.class,
+ err -> new SqlException(err.code(), err.getMessage(), err)));
+
return mappers;
}
}