This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cbc84ebce0 IGNITE-21815 Client handler: use QueryProcessor instead of
IgniteSql (#3459)
cbc84ebce0 is described below
commit cbc84ebce03c7554d8b9e09699f4884033ccaac1
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Mar 21 17:09:36 2024 +0200
IGNITE-21815 Client handler: use QueryProcessor instead of IgniteSql (#3459)
Use internal `QueryProcessor` API in client handler instead of public
`IgniteSql`:
* Simplify `HybridTimestamp` propagation
* Get rid of Session usage, reduce allocations and indirection
* Unblock IGNITE-21669 Remove sessions from SQL API
---
check-rules/spotbugs-excludes.xml | 5 +-
.../apache/ignite/client/handler/TestServer.java | 2 -
.../ignite/client/handler/ClientHandlerModule.java | 9 -
.../handler/ClientInboundMessageHandler.java | 14 +-
.../handler/requests/sql/ClientSqlCommon.java | 50 +---
.../requests/sql/ClientSqlExecuteBatchRequest.java | 86 +++---
.../requests/sql/ClientSqlExecuteRequest.java | 84 ++++--
.../sql/ClientSqlExecuteScriptRequest.java | 14 +-
.../handler/requests/sql/ClientSqlProperties.java | 68 +++++
.../handler/requests/sql/ClientSqlResultSet.java | 11 +-
.../apache/ignite/client/ClientMetricsTest.java | 4 +-
.../org/apache/ignite/client/ClientSqlTest.java | 21 +-
.../ignite/client/TestClientHandlerModule.java | 6 +-
.../java/org/apache/ignite/client/TestServer.java | 1 -
.../ignite/client/fakes/FakeAsyncResultSet.java | 251 ------------------
.../org/apache/ignite/client/fakes/FakeCursor.java | 144 ++++++++--
.../org/apache/ignite/client/fakes/FakeIgnite.java | 2 +-
.../client/fakes/FakeIgniteQueryProcessor.java | 30 ++-
.../apache/ignite/client/fakes/FakeIgniteSql.java | 52 ----
.../ignite/client/fakes/FakeIgniteTables.java | 4 +-
.../apache/ignite/client/fakes/FakeSession.java | 290 ---------------------
.../ignite/client/fakes/FakeSessionBuilder.java | 136 ----------
.../org/apache/ignite/client/fakes/FakeSqlRow.java | 290 ---------------------
.../runner/app/client/ItThinClientSqlTest.java | 16 ++
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 -
.../internal/sql/api/AsyncResultSetImpl.java | 27 +-
.../ignite/internal/sql/api/SessionImpl.java | 247 ++++++++++++------
27 files changed, 548 insertions(+), 1317 deletions(-)
diff --git a/check-rules/spotbugs-excludes.xml
b/check-rules/spotbugs-excludes.xml
index c35cd0fc21..3d4554aca2 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -118,7 +118,10 @@
<Match>
<!-- this method has side effects -->
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <Class name="org.apache.ignite.internal.client.table.AbstractClientView"/>
+ <Or>
+ <Class
name="org.apache.ignite.internal.client.table.AbstractClientView"/>
+ <Class
name="org.apache.ignite.client.handler.requests.sql.ClientSqlCommon"/>
+ </Or>
</Match>
<Match>
<Bug pattern="MS_PKGPROTECT"/>
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
index 286e2a5d66..c0347f7a8e 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TestLowWatermark;
import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
-import org.apache.ignite.sql.IgniteSql;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
import org.mockito.Mockito;
@@ -121,7 +120,6 @@ public class TestServer {
mock(IgniteComputeInternal.class),
clusterService,
bootstrapFactory,
- mock(IgniteSql.class),
() ->
CompletableFuture.completedFuture(ClusterTag.clusterTag(msgFactory, "Test
Server")),
mock(MetricManager.class),
metrics,
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index b5a7f88e58..18898f812b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -62,7 +62,6 @@ import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.sql.IgniteSql;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -83,9 +82,6 @@ public class ClientHandlerModule implements IgniteComponent {
/** Ignite transactions API. */
private final IgniteTransactionsImpl igniteTransactions;
- /** Ignite SQL API. */
- private final IgniteSql sql;
-
/** Cluster ID supplier. */
private final Supplier<CompletableFuture<ClusterTag>> clusterTagSupplier;
@@ -140,7 +136,6 @@ public class ClientHandlerModule implements IgniteComponent
{
* @param igniteCompute Compute.
* @param clusterService Cluster.
* @param bootstrapFactory Bootstrap factory.
- * @param sql SQL.
* @param clusterTagSupplier ClusterTag supplier.
* @param metricManager Metric manager.
* @param authenticationManager Authentication manager.
@@ -155,7 +150,6 @@ public class ClientHandlerModule implements IgniteComponent
{
IgniteComputeInternal igniteCompute,
ClusterService clusterService,
NettyBootstrapFactory bootstrapFactory,
- IgniteSql sql,
Supplier<CompletableFuture<ClusterTag>> clusterTagSupplier,
MetricManager metricManager,
ClientHandlerMetricSource metrics,
@@ -172,7 +166,6 @@ public class ClientHandlerModule implements IgniteComponent
{
assert igniteCompute != null;
assert clusterService != null;
assert bootstrapFactory != null;
- assert sql != null;
assert clusterTagSupplier != null;
assert metricManager != null;
assert metrics != null;
@@ -190,7 +183,6 @@ public class ClientHandlerModule implements IgniteComponent
{
this.igniteCompute = igniteCompute;
this.clusterService = clusterService;
this.bootstrapFactory = bootstrapFactory;
- this.sql = sql;
this.clusterTagSupplier = clusterTagSupplier;
this.metricManager = metricManager;
this.metrics = metrics;
@@ -371,7 +363,6 @@ public class ClientHandlerModule implements IgniteComponent
{
configuration,
igniteCompute,
clusterService,
- sql,
clusterTag,
metrics,
authenticationManager,
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index a04e1f0c41..cbf3431fc4 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -136,7 +136,6 @@ import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.security.AuthenticationType;
import
org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
-import org.apache.ignite.sql.IgniteSql;
import org.jetbrains.annotations.Nullable;
/**
@@ -168,9 +167,6 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
/** Cluster. */
private final ClusterService clusterService;
- /** SQL. */
- private final IgniteSql sql;
-
/** Query processor. */
private final QueryProcessor queryProcessor;
@@ -216,7 +212,6 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
* @param configuration Configuration.
* @param compute Compute.
* @param clusterService Cluster.
- * @param sql SQL.
* @param clusterTag Cluster tag.
* @param metrics Metrics.
* @param authenticationManager Authentication manager.
@@ -229,7 +224,6 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
ClientConnectorView configuration,
IgniteComputeInternal compute,
ClusterService clusterService,
- IgniteSql sql,
CompletableFuture<ClusterTag> clusterTag,
ClientHandlerMetricSource metrics,
AuthenticationManager authenticationManager,
@@ -245,7 +239,6 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
assert configuration != null;
assert compute != null;
assert clusterService != null;
- assert sql != null;
assert clusterTag != null;
assert metrics != null;
assert authenticationManager != null;
@@ -259,7 +252,6 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
this.configuration = configuration;
this.compute = compute;
this.clusterService = clusterService;
- this.sql = sql;
this.queryProcessor = processor;
this.clusterTag = clusterTag;
this.metrics = metrics;
@@ -766,7 +758,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
return ClientClusterGetNodesRequest.process(out,
clusterService);
case ClientOp.SQL_EXEC:
- return ClientSqlExecuteRequest.process(in, out, sql,
resources, metrics, igniteTransactions);
+ return ClientSqlExecuteRequest.process(in, out,
queryProcessor, resources, metrics, igniteTransactions);
case ClientOp.SQL_CURSOR_NEXT_PAGE:
return ClientSqlCursorNextPageRequest.process(in, out,
resources, igniteTransactions);
@@ -781,13 +773,13 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
return ClientJdbcFinishTxRequest.process(in, out,
jdbcQueryEventHandler);
case ClientOp.SQL_EXEC_SCRIPT:
- return ClientSqlExecuteScriptRequest.process(in, sql,
igniteTransactions);
+ return ClientSqlExecuteScriptRequest.process(in,
queryProcessor, igniteTransactions);
case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(in, out,
queryProcessor, resources);
case ClientOp.SQL_EXEC_BATCH:
- return ClientSqlExecuteBatchRequest.process(in, out, sql,
resources, metrics, igniteTransactions);
+ return ClientSqlExecuteBatchRequest.process(in, out,
queryProcessor, resources, igniteTransactions);
case ClientOp.STREAMER_BATCH_SEND:
return ClientStreamerBatchSendRequest.process(in, out,
igniteTables);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index a92622dcb0..e4df80e201 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -20,22 +20,13 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
-import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
-import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
-import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.internal.sql.api.SessionBuilderImpl;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnMetadata.ColumnOrigin;
-import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Session.SessionBuilder;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.tx.IgniteTransactions;
/**
* Common SQL request handling logic.
@@ -61,6 +52,7 @@ class ClientSqlCommon {
}
if (!asyncResultSet.hasMorePages()) {
+ // Close in background.
asyncResultSet.closeAsync();
}
}
@@ -153,51 +145,13 @@ class ClientSqlCommon {
}
}
- static Session readSession(ClientMessageUnpacker in, IgniteSql sql,
IgniteTransactions transactions) {
- SessionBuilder sessionBuilder = sql.sessionBuilder();
-
- if (transactions != null && sessionBuilder instanceof
SessionBuilderImpl) {
- ((SessionBuilderImpl)
sessionBuilder).igniteTransactions(transactions);
- }
-
- if (!in.tryUnpackNil()) {
- sessionBuilder.defaultSchema(in.unpackString());
- }
-
- if (!in.tryUnpackNil()) {
- sessionBuilder.defaultPageSize(in.unpackInt());
- }
-
- if (!in.tryUnpackNil()) {
- sessionBuilder.defaultQueryTimeout(in.unpackLong(),
TimeUnit.MILLISECONDS);
- }
-
- if (!in.tryUnpackNil()) {
- sessionBuilder.idleTimeout(in.unpackLong(), TimeUnit.MILLISECONDS);
- }
-
- readSessionProperties(in, sessionBuilder);
-
- return sessionBuilder.build();
- }
-
- private static void readSessionProperties(ClientMessageUnpacker in,
SessionBuilder sessionBuilder) {
- var propCount = in.unpackInt();
- var reader = new BinaryTupleReader(propCount * 4,
in.readBinaryUnsafe());
-
- for (int i = 0; i < propCount; i++) {
- // noinspection DataFlowIssue
- sessionBuilder.property(reader.stringValue(i * 4),
ClientBinaryTupleUtils.readObject(reader, i * 4 + 1));
- }
- }
-
/**
* Pack columns metadata.
*
* @param out Message packer.
* @param cols Columns.
*/
- public static void packColumns(ClientMessagePacker out,
List<ColumnMetadata> cols) {
+ static void packColumns(ClientMessagePacker out, List<ColumnMetadata>
cols) {
out.packInt(cols.size());
// In many cases there are multiple columns from the same table.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 5496b5f761..979fc81a38 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -17,28 +17,21 @@
package org.apache.ignite.client.handler.requests.sql;
-import static
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.readSession;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
-import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.api.SessionImpl;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlBatchException;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.Statement.StatementBuilder;
-import org.jetbrains.annotations.Nullable;
/**
* Client SQL execute batch request.
@@ -51,21 +44,19 @@ public class ClientSqlExecuteBatchRequest {
* @param out Packer.
* @param sql SQL API.
* @param resources Resources.
- * @param metrics Metrics.
* @param transactions Transactional facade. Used to acquire last observed
time to propagate to client in response.
* @return Future representing result of operation.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
- IgniteSql sql,
+ QueryProcessor sql,
ClientResourceRegistry resources,
- ClientHandlerMetricSource metrics,
IgniteTransactionsImpl transactions
) {
- var tx = readTx(in, out, resources);
- Session session = readSession(in, sql, transactions);
- Statement statement = readStatement(in, sql);
+ InternalTransaction tx = readTx(in, out, resources);
+ ClientSqlProperties props = new ClientSqlProperties(in);
+ String statement = in.unpackString();
BatchedArguments arguments =
in.unpackObjectArrayFromBinaryTupleArray();
if (arguments == null) {
@@ -73,38 +64,46 @@ public class ClientSqlExecuteBatchRequest {
arguments = BatchedArguments.of(ArrayUtils.OBJECT_EMPTY_ARRAY);
}
- // TODO IGNITE-20232 Propagate observable timestamp to sql engine
using internal API.
HybridTimestamp clientTs =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
-
transactions.updateObservableTimestamp(clientTs);
- return session
- .executeBatchAsync(tx, statement.query(), arguments)
+ return SessionImpl.executeBatchCore(
+ sql,
+ transactions,
+ tx,
+ statement,
+ arguments,
+ props.toSqlProps(),
+ () -> true,
+ () -> {},
+ cursor -> 0,
+ cursorId -> {})
.handle((affectedRows, ex) -> {
out.meta(transactions.observableTimestamp());
+
if (ex != null) {
var cause = ExceptionUtils.unwrapCause(ex.getCause());
if (cause instanceof SqlBatchException) {
var exBatch = ((SqlBatchException) cause);
- return writeBatchResultAsync(out, resources,
exBatch.updateCounters(),
- exBatch.errorCode(), exBatch.getMessage(),
session, metrics);
+
+ writeBatchResult(out, exBatch.updateCounters(),
exBatch.errorCode(), exBatch.getMessage());
+ return null;
}
+
affectedRows = ArrayUtils.LONG_EMPTY_ARRAY;
}
- return writeBatchResultAsync(out, resources, affectedRows,
session, metrics);
- }).thenCompose(Function.identity());
+ writeBatchResult(out, affectedRows);
+ return null;
+ });
}
- private static CompletionStage<Void> writeBatchResultAsync(
+ private static void writeBatchResult(
ClientMessagePacker out,
- ClientResourceRegistry resources,
long[] affectedRows,
Short errorCode,
- String errorMessage,
- Session session,
- ClientHandlerMetricSource metrics) {
+ String errorMessage) {
out.packNil(); // resourceId
out.packBoolean(false); // has row set
@@ -113,16 +112,11 @@ public class ClientSqlExecuteBatchRequest {
out.packLongArray(affectedRows); // affected rows
out.packShort(errorCode); // error code
out.packString(errorMessage); // error message
-
- return session.closeAsync();
}
- private static CompletionStage<Void> writeBatchResultAsync(
+ private static void writeBatchResult(
ClientMessagePacker out,
- ClientResourceRegistry resources,
- long[] affectedRows,
- Session session,
- ClientHandlerMetricSource metrics) {
+ long[] affectedRows) {
out.packNil(); // resourceId
out.packBoolean(false); // has row set
@@ -131,25 +125,5 @@ public class ClientSqlExecuteBatchRequest {
out.packLongArray(affectedRows); // affected rows
out.packNil(); // error code
out.packNil(); // error message
-
- return session.closeAsync();
- }
-
- private static Statement readStatement(ClientMessageUnpacker in, IgniteSql
sql) {
- StatementBuilder statementBuilder = sql.statementBuilder();
-
- statementBuilder.query(in.unpackString());
-
- return statementBuilder.build();
- }
-
- private static void packMeta(ClientMessagePacker out, @Nullable
ResultSetMetadata meta) {
- // TODO IGNITE-17179 metadata caching - avoid sending same meta over
and over.
- if (meta == null || meta.columns() == null) {
- out.packInt(0);
- return;
- }
-
- ClientSqlCommon.packColumns(out, meta.columns());
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 605e644c74..6cf39ac297 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -18,11 +18,12 @@
package org.apache.ignite.client.handler.requests.sql;
import static
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.packCurrentPage;
-import static
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.readSession;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
+import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResource;
@@ -32,14 +33,21 @@ import
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryProperty;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
/**
@@ -61,14 +69,14 @@ public class ClientSqlExecuteRequest {
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
- IgniteSql sql,
+ QueryProcessor sql,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
IgniteTransactionsImpl transactions
) {
- var tx = readTx(in, out, resources);
- Session session = readSession(in, sql, transactions);
- Statement statement = readStatement(in, sql);
+ InternalTransaction tx = readTx(in, out, resources);
+ ClientSqlProperties props = new ClientSqlProperties(in);
+ String statement = in.unpackString();
Object[] arguments = in.unpackObjectArrayFromBinaryTuple();
if (arguments == null) {
@@ -76,17 +84,14 @@ public class ClientSqlExecuteRequest {
arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
}
- // TODO IGNITE-20232 Propagate observable timestamp to sql engine
using internal API.
HybridTimestamp clientTs =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
-
transactions.updateObservableTimestamp(clientTs);
- return session
- .executeAsync(tx, statement, arguments)
+ return executeAsync(tx, sql, transactions, statement,
props.pageSize(), props.toSqlProps(), arguments)
.thenCompose(asyncResultSet -> {
out.meta(transactions.observableTimestamp());
- return writeResultSetAsync(out, resources, asyncResultSet,
session, metrics);
+ return writeResultSetAsync(out, resources, asyncResultSet,
metrics);
});
}
@@ -94,7 +99,6 @@ public class ClientSqlExecuteRequest {
ClientMessagePacker out,
ClientResourceRegistry resources,
AsyncResultSet asyncResultSet,
- Session session,
ClientHandlerMetricSource metrics) {
boolean hasResource = asyncResultSet.hasRowSet() &&
asyncResultSet.hasMorePages();
@@ -102,7 +106,7 @@ public class ClientSqlExecuteRequest {
try {
metrics.cursorsActiveIncrement();
- var clientResultSet = new ClientSqlResultSet(asyncResultSet,
session, metrics);
+ var clientResultSet = new ClientSqlResultSet(asyncResultSet,
metrics);
ClientResource resource = new ClientResource(
clientResultSet,
@@ -133,20 +137,12 @@ public class ClientSqlExecuteRequest {
return hasResource
? nullCompletedFuture()
- : asyncResultSet.closeAsync().thenCompose(res ->
session.closeAsync());
+ : asyncResultSet.closeAsync();
} else {
- return asyncResultSet.closeAsync().thenCompose(res ->
session.closeAsync());
+ return asyncResultSet.closeAsync();
}
}
- private static Statement readStatement(ClientMessageUnpacker in, IgniteSql
sql) {
- StatementBuilder statementBuilder = sql.statementBuilder();
-
- statementBuilder.query(in.unpackString());
-
- return statementBuilder.build();
- }
-
private static void packMeta(ClientMessagePacker out, @Nullable
ResultSetMetadata meta) {
// TODO IGNITE-17179 metadata caching - avoid sending same meta over
and over.
if (meta == null || meta.columns() == null) {
@@ -156,4 +152,40 @@ public class ClientSqlExecuteRequest {
ClientSqlCommon.packColumns(out, meta.columns());
}
+
+ private static CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ QueryProcessor qryProc,
+ IgniteTransactions transactions,
+ String query,
+ int pageSize,
+ SqlProperties props,
+ @Nullable Object... arguments
+ ) {
+ try {
+ SqlProperties properties =
SqlPropertiesHelper.builderFromProperties(props)
+ .set(QueryProperty.ALLOWED_QUERY_TYPES,
SqlQueryType.SINGLE_STMT_TYPES)
+ .build();
+
+ CompletableFuture<AsyncResultSet<SqlRow>> fut =
qryProc.querySingleAsync(
+ properties, transactions, (InternalTransaction)
transaction, query, arguments)
+ .thenCompose(cur -> cur.requestNextAsync(pageSize)
+ .thenApply(
+ batchRes -> new AsyncResultSetImpl<>(
+ cur,
+ batchRes,
+ pageSize
+ )
+ )
+ );
+
+ return fut.exceptionally((th) -> {
+ Throwable cause = ExceptionUtils.unwrapCause(th);
+
+ throw new CompletionException(mapToPublicSqlException(cause));
+ });
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(mapToPublicSqlException(e));
+ }
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
index 641f87890e..c6c8d9f450 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
@@ -17,15 +17,13 @@
package org.apache.ignite.client.handler.requests.sql;
-import static
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.readSession;
-
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.api.SessionImpl;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
/**
* Client SQL execute script request.
@@ -40,10 +38,10 @@ public class ClientSqlExecuteScriptRequest {
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
- IgniteSql sql,
+ QueryProcessor sql,
IgniteTransactionsImpl transactions
) {
- Session session = readSession(in, sql, transactions);
+ ClientSqlProperties props = new ClientSqlProperties(in);
String script = in.unpackString();
Object[] arguments = in.unpackObjectArrayFromBinaryTuple();
@@ -52,11 +50,9 @@ public class ClientSqlExecuteScriptRequest {
arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
}
- // TODO IGNITE-20232 Propagate observable timestamp to sql engine
using internal API.
HybridTimestamp clientTs =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
-
transactions.updateObservableTimestamp(clientTs);
- return session.executeScriptAsync(script, arguments);
+ return SessionImpl.executeScriptCore(sql, transactions, () -> true, ()
-> {}, script, arguments, props.toSqlProps());
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
new file mode 100644
index 0000000000..7eefc2c904
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.sql;
+
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.sql.AbstractSession;
+import org.apache.ignite.internal.sql.engine.QueryProperty;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+
+class ClientSqlProperties {
+ private final String schema;
+
+ private final int pageSize;
+
+ private final long queryTimeout;
+
+ private final long idleTimeout;
+
+ ClientSqlProperties(ClientMessageUnpacker in) {
+ schema = in.tryUnpackNil() ? AbstractSession.DEFAULT_SCHEMA :
in.unpackString();
+ pageSize = in.tryUnpackNil() ? AbstractSession.DEFAULT_PAGE_SIZE :
in.unpackInt();
+ queryTimeout = in.tryUnpackNil() ? 0 : in.unpackLong();
+ idleTimeout = in.tryUnpackNil() ? 0 : in.unpackLong();
+
+ // Skip properties - not used by SQL engine.
+ in.unpackInt(); // Number of properties.
+ in.readBinaryUnsafe(); // Binary tuple with properties
+ }
+
+ public String schema() {
+ return schema;
+ }
+
+ public int pageSize() {
+ return pageSize;
+ }
+
+ public long queryTimeout() {
+ return queryTimeout;
+ }
+
+ public long idleTimeout() {
+ return idleTimeout;
+ }
+
+ SqlProperties toSqlProps() {
+ return SqlPropertiesHelper.newBuilder()
+ .set(QueryProperty.QUERY_TIMEOUT, queryTimeout)
+ .set(QueryProperty.DEFAULT_SCHEMA, schema)
+ .build();
+ }
+}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
index 802cce0723..0712927247 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
@@ -22,7 +22,6 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
-import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
@@ -33,9 +32,6 @@ class ClientSqlResultSet {
/** Result set. */
private final AsyncResultSet<SqlRow> resultSet;
- /** Session. */
- private final Session session;
-
/** Metrics. */
private final ClientHandlerMetricSource metrics;
@@ -46,16 +42,13 @@ class ClientSqlResultSet {
* Constructor.
*
* @param resultSet Result set.
- * @param session Session.
* @param metrics Metrics.
*/
- ClientSqlResultSet(AsyncResultSet<SqlRow> resultSet, Session session,
ClientHandlerMetricSource metrics) {
+ ClientSqlResultSet(AsyncResultSet<SqlRow> resultSet,
ClientHandlerMetricSource metrics) {
assert resultSet != null;
- assert session != null;
assert metrics != null;
this.resultSet = resultSet;
- this.session = session;
this.metrics = metrics;
}
@@ -77,7 +70,7 @@ class ClientSqlResultSet {
if (closed.compareAndSet(false, true)) {
metrics.cursorsActiveDecrement();
- return resultSet.closeAsync().thenCompose(res ->
session.closeAsync()).toCompletableFuture();
+ return resultSet.closeAsync();
}
return nullCompletedFuture();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index 77f366c449..077b4f611d 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.client.fakes.FakeIgniteQueryProcessor;
import org.apache.ignite.client.fakes.FakeIgniteTables;
-import org.apache.ignite.client.fakes.FakeSession;
import org.apache.ignite.internal.client.ClientMetricSource;
import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.metrics.AbstractMetricSource;
@@ -192,7 +192,7 @@ public class ClientMetricsTest extends
BaseIgniteAbstractTest {
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
"Query failed",
- () -> client.sql().createSession().execute(null,
FakeSession.FAILED_SQL));
+ () -> client.sql().createSession().execute(null,
FakeIgniteQueryProcessor.FAILED_SQL));
assertEquals(0, metrics().requestsActive());
assertEquals(1, metrics().requestsFailed());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
index d27a970b79..0836a00b15 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
@@ -21,6 +21,7 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -97,8 +98,10 @@ public class ClientSqlTest extends AbstractClientTableTest {
assertEquals("SCHEMA1", props.get("schema"));
assertEquals("123000", props.get("timeout"));
assertEquals("234", props.get("pageSize"));
- assertEquals("1", props.get("prop1"));
- assertEquals("2", props.get("prop2"));
+
+ // Properties are ignored by the SQL engine for now.
+ assertNull(props.get("prop1"));
+ assertNull(props.get("prop2"));
}
@Test
@@ -128,9 +131,11 @@ public class ClientSqlTest extends AbstractClientTableTest
{
assertEquals("SCHEMA2", props.get("schema"));
assertEquals("124000", props.get("timeout"));
assertEquals("235", props.get("pageSize"));
- assertEquals("1", props.get("prop1"));
- assertEquals("22", props.get("prop2"));
- assertEquals("3", props.get("prop3"));
+
+ // Properties are ignored by the SQL engine for now.
+ assertNull(props.get("prop1"));
+ assertNull(props.get("prop2"));
+ assertNull(props.get("prop3"));
}
@Test
@@ -227,8 +232,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
SqlRow row = resultSet.next();
assertEquals(
- "foo, arguments: [], properties: [], defaultPageSize=null,
defaultSchema=null, "
- + "defaultQueryTimeout=null,
defaultSessionTimeout=null",
+ "foo, arguments: [], defaultSchema=PUBLIC,
defaultQueryTimeout=0",
row.value(0));
}
@@ -250,8 +254,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
SqlRow row = resultSet.next();
assertEquals(
- "do bar baz, arguments: [arg1, null, 2, ], properties:
[prop2=-5, prop1=val1, prop3=null, ], "
- + "defaultPageSize=123, defaultSchema=script-schema,
defaultQueryTimeout=456, defaultSessionTimeout=789000",
+ "do bar baz, arguments: [arg1, null, 2, ],
defaultSchema=script-schema, defaultQueryTimeout=456",
row.value(0));
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index e22a5c4c37..4e6dab3a72 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -18,7 +18,6 @@
package org.apache.ignite.client;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.mockito.Mockito.mock;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
@@ -35,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.ignite.Ignite;
+import org.apache.ignite.client.fakes.FakeIgniteQueryProcessor;
import org.apache.ignite.client.fakes.FakeInternalTable;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientInboundMessageHandler;
@@ -51,7 +51,6 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.security.authentication.AuthenticationManager;
-import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TestLowWatermark;
import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
@@ -211,11 +210,10 @@ public class TestClientHandlerModule implements
IgniteComponent {
new ClientInboundMessageHandler(
(IgniteTablesInternal) ignite.tables(),
(IgniteTransactionsImpl)
ignite.transactions(),
- mock(QueryProcessor.class),
+ new FakeIgniteQueryProcessor(),
configuration,
compute,
clusterService,
- ignite.sql(),
CompletableFuture.completedFuture(clusterTag),
metrics,
authenticationManager,
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index ad9d640c5e..dae638f486 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -238,7 +238,6 @@ public class TestServer implements AutoCloseable {
compute,
clusterService,
bootstrapFactory,
- ignite.sql(),
() -> CompletableFuture.completedFuture(tag),
mock(MetricManager.class),
metrics,
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
deleted file mode 100644
index acb7d6d719..0000000000
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.Period;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.sql.ColumnMetadata;
-import org.apache.ignite.sql.ColumnType;
-import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.tx.Transaction;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Fake result set.
- */
-@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-public class FakeAsyncResultSet implements AsyncResultSet {
- private final Session session;
-
- private final Transaction transaction;
-
- private final Statement statement;
-
- private final Object[] arguments;
-
- private final List<SqlRow> rows;
-
- private final List<ColumnMetadata> columns;
-
- private final boolean hasMorePages;
-
- private final FakeIgniteSql sql;
-
- /**
- * Constructor.
- *
- * @param session Session.
- * @param transaction Transaction.
- * @param statement Statement.
- * @param arguments Arguments.
- */
- public FakeAsyncResultSet(Session session, Transaction transaction,
Statement statement, Object[] arguments, FakeIgniteSql sql) {
- assert session != null;
- assert statement != null;
-
- this.session = session;
- this.transaction = transaction;
- this.statement = statement;
- this.arguments = arguments;
- this.sql = sql;
-
- hasMorePages = session.property("hasMorePages") != null;
-
- if ("SELECT PROPS".equals(statement.query())) {
- rows = new ArrayList<>();
-
- rows.add(getRow("schema", session.defaultSchema()));
- rows.add(getRow("timeout",
String.valueOf(session.defaultQueryTimeout(TimeUnit.MILLISECONDS))));
- rows.add(getRow("pageSize",
String.valueOf(session.defaultPageSize())));
-
- var props = ((FakeSession) session).properties();
-
- for (var e : props.entrySet()) {
- rows.add(getRow(e.getKey(), e.getValue()));
- }
-
- columns = new ArrayList<>();
-
- columns.add(new FakeColumnMetadata("name", ColumnType.STRING));
- columns.add(new FakeColumnMetadata("val", ColumnType.STRING));
- } else if ("SELECT META".equals(statement.query())) {
- columns = new ArrayList<>();
-
- columns.add(new FakeColumnMetadata("BOOL", ColumnType.BOOLEAN));
- columns.add(new FakeColumnMetadata("INT8", ColumnType.INT8));
- columns.add(new FakeColumnMetadata("INT16", ColumnType.INT16));
- columns.add(new FakeColumnMetadata("INT32", ColumnType.INT32));
- columns.add(new FakeColumnMetadata("INT64", ColumnType.INT64));
- columns.add(new FakeColumnMetadata("FLOAT", ColumnType.FLOAT));
- columns.add(new FakeColumnMetadata("DOUBLE", ColumnType.DOUBLE));
- columns.add(new FakeColumnMetadata("DECIMAL", ColumnType.DECIMAL,
1, 2,
- true, new ColumnOrigin("SCHEMA1", "TBL2", "BIG_DECIMAL")));
- columns.add(new FakeColumnMetadata("DATE", ColumnType.DATE));
- columns.add(new FakeColumnMetadata("TIME", ColumnType.TIME));
- columns.add(new FakeColumnMetadata("DATETIME",
ColumnType.DATETIME));
- columns.add(new FakeColumnMetadata("TIMESTAMP",
ColumnType.TIMESTAMP));
- columns.add(new FakeColumnMetadata("UUID", ColumnType.UUID));
- columns.add(new FakeColumnMetadata("BITMASK", ColumnType.BITMASK));
- columns.add(new FakeColumnMetadata("BYTE_ARRAY",
ColumnType.BYTE_ARRAY));
- columns.add(new FakeColumnMetadata("PERIOD", ColumnType.PERIOD));
- columns.add(new FakeColumnMetadata("DURATION",
ColumnType.DURATION));
- columns.add(new FakeColumnMetadata("NUMBER", ColumnType.NUMBER));
-
- var row = getRow(
- true,
- Byte.MIN_VALUE,
- Short.MIN_VALUE,
- Integer.MIN_VALUE,
- Long.MIN_VALUE,
- 1.3f,
- 1.4d,
- BigDecimal.valueOf(145),
- LocalDate.of(2001, 2, 3),
- LocalTime.of(4, 5),
- LocalDateTime.of(2001, 3, 4, 5, 6),
- Instant.ofEpochSecond(987),
- new UUID(0, 0),
- BitSet.valueOf(new byte[0]),
- new byte[1],
- Period.of(10, 9, 8),
- Duration.ofDays(11),
- BigInteger.valueOf(42));
-
- rows = List.of(row);
- } else if ("SELECT LAST SCRIPT".equals(statement.query())) {
- rows = List.of(getRow(sql.lastScript));
- columns = List.of(new FakeColumnMetadata("script",
ColumnType.STRING));
- } else {
- rows = List.of(getRow(1));
- columns = List.of(new FakeColumnMetadata("col1",
ColumnType.INT32));
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable ResultSetMetadata metadata() {
- return new ResultSetMetadata() {
- @Override
- public List<ColumnMetadata> columns() {
- return columns;
- }
-
- @Override
- public int indexOf(String columnName) {
- return 0;
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean hasRowSet() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override
- public long affectedRows() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean wasApplied() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public Iterable<SqlRow> currentPage() {
- return rows;
- }
-
- /** {@inheritDoc} */
- @Override
- public int currentPageSize() {
- return rows.size();
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
- return CompletableFuture.completedFuture(new
FakeAsyncResultSet(session, transaction, statement, arguments, sql));
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean hasMorePages() {
- return hasMorePages;
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<Void> closeAsync() {
- return nullCompletedFuture();
- }
-
- private SqlRow getRow(Object... vals) {
- return new FakeSqlRow(List.of(vals), metadata());
- }
-
- private static class ColumnOrigin implements ColumnMetadata.ColumnOrigin {
- private final String schemaName;
- private final String tableName;
- private final String columnName;
-
- public ColumnOrigin(String schemaName, String tableName, String
columnName) {
- this.schemaName = schemaName;
- this.tableName = tableName;
- this.columnName = columnName;
- }
-
- @Override
- public String schemaName() {
- return schemaName;
- }
-
- @Override
- public String tableName() {
- return tableName;
- }
-
- @Override
- public String columnName() {
- return columnName;
- }
- }
-}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index 77b882cd9b..159099f5c6 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -17,27 +17,99 @@
package org.apache.ignite.client.fakes;
+import static
org.apache.ignite.internal.sql.engine.QueryProperty.DEFAULT_SCHEMA;
+import static
org.apache.ignite.internal.sql.engine.QueryProperty.QUERY_TIMEOUT;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
import java.util.List;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.sql.engine.util.ListToInternalSqlRowAdapter;
+import org.apache.ignite.sql.ColumnMetadata;
+import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.ResultSetMetadata;
/**
* Fake {@link AsyncSqlCursor}.
*/
public class FakeCursor implements AsyncSqlCursor<InternalSqlRow> {
- private final Random random;
-
- FakeCursor() {
- random = new Random();
+ private final String qry;
+ private final List<ColumnMetadata> columns = new ArrayList<>();
+ private final List<InternalSqlRow> rows = new ArrayList<>();
+
+ FakeCursor(String qry, SqlProperties properties, Object[] params,
FakeIgniteQueryProcessor proc) {
+ this.qry = qry;
+
+ if ("SELECT PROPS".equals(qry)) {
+ columns.add(new FakeColumnMetadata("name", ColumnType.STRING));
+ columns.add(new FakeColumnMetadata("val", ColumnType.STRING));
+
+ rows.add(getRow("schema", properties.get(DEFAULT_SCHEMA)));
+ rows.add(getRow("timeout",
String.valueOf(properties.get(QUERY_TIMEOUT))));
+ } else if ("SELECT META".equals(qry)) {
+ columns.add(new FakeColumnMetadata("BOOL", ColumnType.BOOLEAN));
+ columns.add(new FakeColumnMetadata("INT8", ColumnType.INT8));
+ columns.add(new FakeColumnMetadata("INT16", ColumnType.INT16));
+ columns.add(new FakeColumnMetadata("INT32", ColumnType.INT32));
+ columns.add(new FakeColumnMetadata("INT64", ColumnType.INT64));
+ columns.add(new FakeColumnMetadata("FLOAT", ColumnType.FLOAT));
+ columns.add(new FakeColumnMetadata("DOUBLE", ColumnType.DOUBLE));
+ columns.add(new FakeColumnMetadata("DECIMAL", ColumnType.DECIMAL,
1, 2,
+ true, new ColumnOrigin("SCHEMA1", "TBL2", "BIG_DECIMAL")));
+ columns.add(new FakeColumnMetadata("DATE", ColumnType.DATE));
+ columns.add(new FakeColumnMetadata("TIME", ColumnType.TIME));
+ columns.add(new FakeColumnMetadata("DATETIME",
ColumnType.DATETIME));
+ columns.add(new FakeColumnMetadata("TIMESTAMP",
ColumnType.TIMESTAMP));
+ columns.add(new FakeColumnMetadata("UUID", ColumnType.UUID));
+ columns.add(new FakeColumnMetadata("BITMASK", ColumnType.BITMASK));
+ columns.add(new FakeColumnMetadata("BYTE_ARRAY",
ColumnType.BYTE_ARRAY));
+ columns.add(new FakeColumnMetadata("PERIOD", ColumnType.PERIOD));
+ columns.add(new FakeColumnMetadata("DURATION",
ColumnType.DURATION));
+ columns.add(new FakeColumnMetadata("NUMBER", ColumnType.NUMBER));
+
+ var row = getRow(
+ true,
+ Byte.MIN_VALUE,
+ Short.MIN_VALUE,
+ Integer.MIN_VALUE,
+ Long.MIN_VALUE,
+ 1.3f,
+ 1.4d,
+ BigDecimal.valueOf(145),
+ LocalDate.of(2001, 2, 3),
+ LocalTime.of(4, 5),
+ LocalDateTime.of(2001, 3, 4, 5, 6),
+ Instant.ofEpochSecond(987),
+ new UUID(0, 0),
+ BitSet.valueOf(new byte[0]),
+ new byte[1],
+ Period.of(10, 9, 8),
+ Duration.ofDays(11),
+ BigInteger.valueOf(42));
+
+ rows.add(row);
+ } else if ("SELECT LAST SCRIPT".equals(qry)) {
+ rows.add(getRow(proc.lastScript));
+ columns.add(new FakeColumnMetadata("script", ColumnType.STRING));
+ } else {
+ rows.add(getRow(1));
+ columns.add(new FakeColumnMetadata("col1", ColumnType.INT32));
+ }
}
@Override
@@ -47,18 +119,10 @@ public class FakeCursor implements
AsyncSqlCursor<InternalSqlRow> {
@Override
public CompletableFuture<BatchedResult<InternalSqlRow>>
requestNextAsync(int rows) {
- var batch = new ArrayList<InternalSqlRow>();
-
- for (int i = 0; i < rows; i++) {
- List<Object> row = new ArrayList<>();
- row.add(random.nextInt());
- row.add(random.nextLong());
- row.add(random.nextFloat());
- row.add(random.nextDouble());
- row.add(UUID.randomUUID().toString());
- row.add(null);
-
- batch.add(new ListToInternalSqlRowAdapter(row));
+ var batch = new ArrayList<>(this.rows);
+
+ if ("SELECT PROPS".equals(qry)) {
+ batch.add(getRow("pageSize", String.valueOf(rows)));
}
return CompletableFuture.completedFuture(new BatchedResult<>(batch,
true));
@@ -71,7 +135,17 @@ public class FakeCursor implements
AsyncSqlCursor<InternalSqlRow> {
@Override
public ResultSetMetadata metadata() {
- return null;
+ return new ResultSetMetadata() {
+ @Override
+ public List<ColumnMetadata> columns() {
+ return columns;
+ }
+
+ @Override
+ public int indexOf(String columnName) {
+ return 0;
+ }
+ };
}
@Override
@@ -93,4 +167,38 @@ public class FakeCursor implements
AsyncSqlCursor<InternalSqlRow> {
public CompletableFuture<Void> onFirstPageReady() {
throw new UnsupportedOperationException();
}
+
+ private static InternalSqlRow getRow(Object... vals) {
+ var list = new ArrayList<>(vals.length);
+ Collections.addAll(list, vals);
+
+ return new ListToInternalSqlRowAdapter(list);
+ }
+
+ private static class ColumnOrigin implements ColumnMetadata.ColumnOrigin {
+ private final String schemaName;
+ private final String tableName;
+ private final String columnName;
+
+ private ColumnOrigin(String schemaName, String tableName, String
columnName) {
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.columnName = columnName;
+ }
+
+ @Override
+ public String schemaName() {
+ return schemaName;
+ }
+
+ @Override
+ public String tableName() {
+ return tableName;
+ }
+
+ @Override
+ public String columnName() {
+ return columnName;
+ }
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 0b9d0da870..4fcc3818ca 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -85,7 +85,7 @@ public class FakeIgnite implements Ignite {
/** {@inheritDoc} */
@Override
public IgniteSql sql() {
- return new FakeIgniteSql();
+ throw new UnsupportedOperationException("Not implemented yet");
}
/** {@inheritDoc} */
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index fbdfc5ffb2..3c7b99c093 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -17,7 +17,10 @@
package org.apache.ignite.client.fakes;
+import static
org.apache.ignite.internal.sql.engine.QueryProperty.DEFAULT_SCHEMA;
+import static
org.apache.ignite.internal.sql.engine.QueryProperty.QUERY_TIMEOUT;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -26,6 +29,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
import org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.tx.IgniteTransactions;
import org.jetbrains.annotations.Nullable;
@@ -33,6 +37,10 @@ import org.jetbrains.annotations.Nullable;
* Fake {@link QueryProcessor}.
*/
public class FakeIgniteQueryProcessor implements QueryProcessor {
+ public static final String FAILED_SQL = "SELECT FAIL";
+
+ String lastScript;
+
@Override
public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties
properties,
@Nullable InternalTransaction transaction, String qry, Object...
params) {
@@ -47,7 +55,11 @@ public class FakeIgniteQueryProcessor implements
QueryProcessor {
String qry,
Object... params
) {
- return CompletableFuture.completedFuture(new FakeCursor());
+ if (FAILED_SQL.equals(qry)) {
+ return CompletableFuture.failedFuture(new
SqlException(STMT_VALIDATION_ERR, "Query failed"));
+ }
+
+ return CompletableFuture.completedFuture(new FakeCursor(qry,
properties, params, this));
}
@Override
@@ -58,7 +70,21 @@ public class FakeIgniteQueryProcessor implements
QueryProcessor {
String qry,
Object... params
) {
- throw new UnsupportedOperationException();
+ var sb = new StringBuilder(qry);
+
+ sb.append(", arguments: [");
+
+ for (Object arg : params) {
+ sb.append(arg).append(", ");
+ }
+
+ sb.append(']').append(", ")
+
.append("defaultSchema=").append(properties.get(DEFAULT_SCHEMA)).append(", ")
+
.append("defaultQueryTimeout=").append(properties.get(QUERY_TIMEOUT));
+
+ lastScript = sb.toString();
+
+ return CompletableFuture.completedFuture(new FakeCursor(qry,
properties, params, this));
}
@Override
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteSql.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteSql.java
deleted file mode 100644
index 7095c58fd5..0000000000
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteSql.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Session.SessionBuilder;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.Statement.StatementBuilder;
-
-/**
- * Fake SQL implementation.
- */
-public class FakeIgniteSql implements IgniteSql {
- String lastScript;
-
- @Override
- public Session createSession() {
- return sessionBuilder().build();
- }
-
- @Override
- public SessionBuilder sessionBuilder() {
- return new FakeSessionBuilder(this);
- }
-
- @Override
- public Statement createStatement(String query) {
- return statementBuilder().build();
- }
-
- @Override
- public StatementBuilder statementBuilder() {
- return new ClientStatementBuilder();
- }
-}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index b8d58c7028..2179649d60 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client.fakes;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.List;
@@ -38,6 +39,7 @@ import
org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.Table;
import org.jetbrains.annotations.Nullable;
@@ -220,7 +222,7 @@ public class FakeIgniteTables implements
IgniteTablesInternal {
return
completedFuture(schemaReg.lastKnownSchemaVersion());
}
},
- new FakeIgniteSql(),
+ mock(IgniteSql.class),
-1
);
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
deleted file mode 100644
index 7a87b387a1..0000000000
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import static org.apache.ignite.internal.client.ClientUtils.sync;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
-
-import java.time.ZoneId;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Flow.Publisher;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.sql.AbstractSession;
-import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.SqlException;
-import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.sql.reactive.ReactiveResultSet;
-import org.apache.ignite.table.mapper.Mapper;
-import org.apache.ignite.tx.Transaction;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client SQL session.
- */
-public class FakeSession implements AbstractSession {
- public static final String FAILED_SQL = "SELECT FAIL";
-
- @Nullable
- private final Integer defaultPageSize;
-
- @Nullable
- private final String defaultSchema;
-
- @Nullable
- private final Long defaultQueryTimeout;
-
- @Nullable
- private final Long defaultSessionTimeout;
-
- @Nullable
- private final Map<String, Object> properties;
-
- private final FakeIgniteSql sql;
-
- /**
- * Constructor.
- *
- * @param defaultPageSize Default page size.
- * @param defaultSchema Default schema.
- * @param defaultQueryTimeout Default timeout.
- * @param properties Properties.
- * @param sql SQL.
- */
- @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
- public FakeSession(
- @Nullable Integer defaultPageSize,
- @Nullable String defaultSchema,
- @Nullable Long defaultQueryTimeout,
- @Nullable Long defaultSessionTimeout,
- @Nullable Map<String, Object> properties,
- FakeIgniteSql sql) {
- this.defaultPageSize = defaultPageSize;
- this.defaultSchema = defaultSchema;
- this.defaultQueryTimeout = defaultQueryTimeout;
- this.defaultSessionTimeout = defaultSessionTimeout;
- this.properties = properties;
- this.sql = sql;
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
- @Nullable Transaction transaction,
- String query,
- @Nullable Object... arguments) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
- @Nullable Transaction transaction,
- Statement statement,
- @Nullable Object... arguments) {
- Objects.requireNonNull(statement);
-
- if (FAILED_SQL.equals(statement.query())) {
- return CompletableFuture.failedFuture(new
SqlException(STMT_VALIDATION_ERR, "Query failed"));
- }
-
- return CompletableFuture.completedFuture(new FakeAsyncResultSet(this,
transaction, statement, arguments, sql));
- }
-
- /** {@inheritDoc} */
- @Override
- public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable
Transaction transaction, @Nullable Mapper<T> mapper,
- String query, @Nullable Object... arguments) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
- @Nullable Transaction transaction,
- @Nullable Mapper<T> mapper,
- Statement statement,
- @Nullable Object... arguments) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public ReactiveResultSet executeReactive(@Nullable Transaction
transaction, String query, @Nullable Object... arguments) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public ReactiveResultSet executeReactive(@Nullable Transaction
transaction, Statement statement, @Nullable Object... arguments) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public long[] executeBatch(@Nullable Transaction transaction, String
dmlQuery, BatchedArguments batch) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public long[] executeBatch(@Nullable Transaction transaction, Statement
dmlStatement, BatchedArguments batch) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public Publisher<Long> executeBatchReactive(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public Publisher<Long> executeBatchReactive(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public void executeScript(String query, @Nullable Object... arguments) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<Void> executeScriptAsync(String query, @Nullable
Object... arguments) {
- var sb = new StringBuilder(query);
-
- if (arguments != null) {
- sb.append(", arguments: [");
-
- for (Object arg : arguments) {
- sb.append(arg).append(", ");
- }
-
- sb.append(']');
- }
-
- if (properties != null) {
- sb.append(", properties: [");
-
- for (Map.Entry<String, Object> entry : properties.entrySet()) {
-
sb.append(entry.getKey()).append('=').append(entry.getValue()).append(", ");
- }
-
- sb.append(']');
- }
-
- sb.append(", ").append("defaultPageSize=").append(defaultPageSize)
- .append(", ").append("defaultSchema=").append(defaultSchema)
- .append(",
").append("defaultQueryTimeout=").append(defaultQueryTimeout)
- .append(",
").append("defaultSessionTimeout=").append(defaultSessionTimeout);
-
- sql.lastScript = sb.toString();
-
- return nullCompletedFuture();
- }
-
- /** {@inheritDoc} */
- @Override
- public long defaultQueryTimeout(TimeUnit timeUnit) {
- return defaultQueryTimeout;
- }
-
- /** {@inheritDoc} */
- @Override
- public long idleTimeout(TimeUnit timeUnit) {
- return defaultSessionTimeout;
- }
-
- /** {@inheritDoc} */
- @Override
- public String defaultSchema() {
- return defaultSchema;
- }
-
- /** {@inheritDoc} */
- @Override
- public int defaultPageSize() {
- return defaultPageSize;
- }
-
- /** {@inheritDoc} */
- @Override
- public ZoneId timeZoneId() {
- // TODO https://issues.apache.org/jira/browse/IGNITE-21568
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable Object property(String name) {
- return properties == null ? null : properties.get(name);
- }
-
- /** {@inheritDoc} */
- @Override
- public void close() {
- sync(closeAsync());
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<Void> closeAsync() {
- return nullCompletedFuture();
- }
-
- /** {@inheritDoc} */
- @Override
- public Publisher<Void> closeReactive() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean closed() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public SessionBuilder toBuilder() {
- return null;
- }
-
- public Map<String, Object> properties() {
- // noinspection AssignmentOrReturnOfFieldWithMutableType
- return properties;
- }
-}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
deleted file mode 100644
index c704377a13..0000000000
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Session.SessionBuilder;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client SQL session builder.
- */
-public class FakeSessionBuilder implements SessionBuilder {
- private final Map<String, Object> properties = new HashMap<>();
-
- private final FakeIgniteSql sql;
-
- private String defaultSchema;
-
- private Long defaultQueryTimeoutMs;
-
- private Long defaultSessionTimeoutMs;
-
- private Integer pageSize;
-
- public FakeSessionBuilder(FakeIgniteSql sql) {
- this.sql = sql;
- }
-
- /** {@inheritDoc} */
- @Override
- public long defaultQueryTimeout(TimeUnit timeUnit) {
- Objects.requireNonNull(timeUnit);
-
- return timeUnit.convert(defaultQueryTimeoutMs == null ? 0 :
defaultQueryTimeoutMs, TimeUnit.MILLISECONDS);
- }
-
- /** {@inheritDoc} */
- @Override
- public SessionBuilder defaultQueryTimeout(long timeout, TimeUnit timeUnit)
{
- Objects.requireNonNull(timeUnit);
-
- defaultQueryTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout,
timeUnit);
-
- return this;
- }
-
- @Override
- public long idleTimeout(TimeUnit timeUnit) {
- Objects.requireNonNull(timeUnit);
-
- return timeUnit.convert(defaultSessionTimeoutMs == null ? 0 :
defaultSessionTimeoutMs, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public SessionBuilder idleTimeout(long timeout, TimeUnit timeUnit) {
- Objects.requireNonNull(timeUnit);
-
- defaultSessionTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout,
timeUnit);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override
- public String defaultSchema() {
- return defaultSchema;
- }
-
- /** {@inheritDoc} */
- @Override
- public SessionBuilder defaultSchema(String schema) {
- defaultSchema = schema;
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override
- public int defaultPageSize() {
- return pageSize == null ? 0 : pageSize;
- }
-
- /** {@inheritDoc} */
- @Override
- public SessionBuilder defaultPageSize(int pageSize) {
- this.pageSize = pageSize;
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override
- public SessionBuilder timeZoneId(ZoneId timeZoneId) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable Object property(String name) {
- return properties.get(name);
- }
-
- /** {@inheritDoc} */
- @Override
- public SessionBuilder property(String name, @Nullable Object value) {
- properties.put(name, value);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override
- public Session build() {
- return new FakeSession(pageSize, defaultSchema, defaultQueryTimeoutMs,
defaultSessionTimeoutMs, new HashMap<>(properties), sql);
- }
-}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSqlRow.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSqlRow.java
deleted file mode 100644
index 6c6652f3ab..0000000000
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSqlRow.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.table.Tuple;
-
-/**
- * Client SQL row.
- */
-public class FakeSqlRow implements SqlRow {
- /** Row. */
- private final List<Object> row;
-
- /** Meta. */
- private final ResultSetMetadata metadata;
-
- /**
- * Constructor.
- *
- * @param row Row.
- * @param meta Meta.
- */
- public FakeSqlRow(List<Object> row, ResultSetMetadata meta) {
- assert row != null;
- assert meta != null;
-
- // noinspection AssignmentOrReturnOfFieldWithMutableType
- this.row = row;
- this.metadata = meta;
- }
-
- /** {@inheritDoc} */
- @Override
- public int columnCount() {
- return row.size();
- }
-
- /** {@inheritDoc} */
- @Override
- public String columnName(int columnIndex) {
- return metadata.columns().get(columnIndex).name();
- }
-
- /** {@inheritDoc} */
- @Override
- public int columnIndex(String columnName) {
- return metadata.indexOf(columnName);
- }
-
- private int columnIndexChecked(String columnName) {
- int idx = columnIndex(columnName);
-
- if (idx == -1) {
- throw new IllegalArgumentException("Column doesn't exist [name=" +
columnName + ']');
- }
-
- return idx;
- }
-
- /** {@inheritDoc} */
- @Override
- public <T> T valueOrDefault(String columnName, T defaultValue) {
- T ret = (T) row.get(columnIndexChecked(columnName));
-
- return ret != null ? ret : defaultValue;
- }
-
- /** {@inheritDoc} */
- @Override
- public Tuple set(String columnName, Object value) {
- throw new UnsupportedOperationException("Operation not supported.");
- }
-
- /** {@inheritDoc} */
- @Override
- public <T> T value(String columnName) throws IllegalArgumentException {
- return (T) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public <T> T value(int columnIndex) {
- return (T) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean booleanValue(String columnName) {
- return (boolean) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean booleanValue(int columnIndex) {
- return (boolean) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public byte byteValue(String columnName) {
- return (byte) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public byte byteValue(int columnIndex) {
- return (byte) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public short shortValue(String columnName) {
- return (short) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public short shortValue(int columnIndex) {
- return (short) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public int intValue(String columnName) {
- return (int) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public int intValue(int columnIndex) {
- return (int) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public long longValue(String columnName) {
- return (long) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public long longValue(int columnIndex) {
- return (long) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public float floatValue(String columnName) {
- return (float) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public float floatValue(int columnIndex) {
- return (float) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public double doubleValue(String columnName) {
- return (double) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public double doubleValue(int columnIndex) {
- return (double) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public String stringValue(String columnName) {
- return (String) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public String stringValue(int columnIndex) {
- return (String) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public UUID uuidValue(String columnName) {
- return (UUID) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public UUID uuidValue(int columnIndex) {
- return (UUID) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public BitSet bitmaskValue(String columnName) {
- return (BitSet) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public BitSet bitmaskValue(int columnIndex) {
- return (BitSet) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public LocalDate dateValue(String columnName) {
- return (LocalDate) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public LocalDate dateValue(int columnIndex) {
- return (LocalDate) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public LocalTime timeValue(String columnName) {
- return (LocalTime) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public LocalTime timeValue(int columnIndex) {
- return (LocalTime) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public LocalDateTime datetimeValue(String columnName) {
- return (LocalDateTime) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public LocalDateTime datetimeValue(int columnIndex) {
- return (LocalDateTime) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public Instant timestampValue(String columnName) {
- return (Instant) row.get(columnIndexChecked(columnName));
- }
-
- /** {@inheritDoc} */
- @Override
- public Instant timestampValue(int columnIndex) {
- return (Instant) row.get(columnIndex);
- }
-
- /** {@inheritDoc} */
- @Override
- public Iterator<Object> iterator() {
- return row.iterator();
- }
-
- /** {@inheritDoc} */
- @Override
- public ResultSetMetadata metadata() {
- return metadata;
- }
-}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index 1815b8617e..9d70105580 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.sql.NoRowSetExpectedException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
@@ -507,6 +508,21 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
assertEquals(ColumnType.NULL, meta.columns().get(14).type());
}
+ @Test
+ public void testExecuteScriptFail() {
+ var script = "CREATE TABLE execute_script_fail (id INT PRIMARY KEY,
step INTEGER); "
+ + "INSERT INTO execute_script_fail VALUES(1, 0); "
+ + "UPDATE execute_script_fail SET step = 1; "
+ + "UPDATE execute_script_fail SET step = 3 WHERE step > 1/0; "
+ + "UPDATE execute_script_fail SET step = 2; ";
+
+ SqlException e = assertThrows(
+ SqlException.class,
+ () -> client().sql().createSession().executeScript(script));
+
+ assertThat(e.getMessage(), Matchers.containsString("Division by
zero"));
+ }
+
private static class Pojo {
public int num;
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1627d80b6b..8ea348e858 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -833,7 +833,6 @@ public class IgniteImpl implements Ignite {
compute,
clusterSvc,
nettyBootstrapFactory,
- sql,
() -> cmgMgr.clusterState().thenApply(s -> s.clusterTag()),
metricManager,
new ClientHandlerMetricSource(),
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 13e2422cb9..026ca58f17 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -41,7 +41,7 @@ import org.jetbrains.annotations.Nullable;
* Asynchronous result set implementation.
*/
public class AsyncResultSetImpl<T> implements AsyncResultSet<T> {
- private final IdleExpirationTracker expirationTracker;
+ private final @Nullable IdleExpirationTracker expirationTracker;
private final AsyncSqlCursor<InternalSqlRow> cursor;
@@ -49,6 +49,21 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
private final int pageSize;
+ /**
+ * Constructor.
+ *
+ * @param cursor Query cursor representing the result of execution.
+ * @param page Current page.
+ * @param pageSize Size of the page to fetch.
+ */
+ public AsyncResultSetImpl(
+ AsyncSqlCursor<InternalSqlRow> cursor,
+ BatchedResult<InternalSqlRow> page,
+ int pageSize
+ ) {
+ this(cursor, page, pageSize, null);
+ }
+
/**
* Constructor.
*
@@ -62,7 +77,7 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
AsyncSqlCursor<InternalSqlRow> cursor,
BatchedResult<InternalSqlRow> page,
int pageSize,
- IdleExpirationTracker expirationTracker
+ @Nullable IdleExpirationTracker expirationTracker
) {
this.cursor = cursor;
this.curPage = page;
@@ -111,7 +126,9 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
public Iterable<T> currentPage() {
requireResultSet();
- expirationTracker.touch();
+ if (expirationTracker != null) {
+ expirationTracker.touch();
+ }
Iterator<InternalSqlRow> it0 = curPage.items().iterator();
ResultSetMetadata meta0 = cursor.metadata();
@@ -133,7 +150,9 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
requireResultSet();
- expirationTracker.touch();
+ if (expirationTracker != null) {
+ expirationTracker.touch();
+ }
return cursor.requestNextAsync(pageSize)
.thenApply(page -> {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index c7b2453aa9..2193854eda 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -41,7 +41,9 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.AbstractSession;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -309,98 +311,138 @@ public class SessionImpl implements AbstractSession {
.set(QueryProperty.ALLOWED_QUERY_TYPES,
EnumSet.of(SqlQueryType.DML))
.build();
- var counters = new LongArrayList(batch.size());
- CompletableFuture<?> tail = nullCompletedFuture();
- ArrayList<CompletableFuture<?>> batchFuts = new
ArrayList<>(batch.size());
-
- for (int i = 0; i < batch.size(); ++i) {
- Object[] args = batch.get(i).toArray();
+ return executeBatchCore(
+ qryProc,
+ transactions,
+ (InternalTransaction) transaction,
+ query,
+ batch,
+ properties,
+ busyLock::enterBusy,
+ busyLock::leaveBusy,
+ this::registerCursor,
+ openedCursors::remove);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(mapToPublicSqlException(e));
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
- tail = tail.thenCompose(v -> {
- if (!busyLock.enterBusy()) {
- return
CompletableFuture.failedFuture(sessionIsClosedException());
- }
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
- try {
- return qryProc.querySingleAsync(properties,
transactions, (InternalTransaction) transaction, query, args)
- .thenCompose(cursor -> {
- if (!busyLock.enterBusy()) {
- cursor.closeAsync();
+ /**
+ * Execute batch of DML statements.
+ *
+ * @param qryProc Query processor.
+ * @param transactions Transactions facade.
+ * @param transaction Transaction.
+ * @param query Query.
+ * @param batch Batch of arguments.
+ * @param properties Properties.
+ * @param enterBusy Enter busy lock action.
+ * @param leaveBusy Leave busy lock action.
+ * @param registerCursor Register cursor action.
+ * @param removeCursor Remove cursor action.
+ * @return Operation Future completed with the number of rows affected by
each query in the batch
+ * (if the batch succeeds), future completed with the {@link
SqlBatchException} (if the batch fails).
+ */
+ public static CompletableFuture<long[]> executeBatchCore(
+ QueryProcessor qryProc,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction transaction,
+ String query,
+ BatchedArguments batch,
+ SqlProperties properties,
+ Supplier<Boolean> enterBusy,
+ Runnable leaveBusy,
+ Function<AsyncSqlCursor<?>, Integer> registerCursor,
+ Consumer<Integer> removeCursor) {
+ var counters = new LongArrayList(batch.size());
+ CompletableFuture<?> tail = nullCompletedFuture();
+ ArrayList<CompletableFuture<?>> batchFuts = new
ArrayList<>(batch.size());
+
+ for (int i = 0; i < batch.size(); ++i) {
+ Object[] args = batch.get(i).toArray();
+
+ tail = tail.thenCompose(v -> {
+ if (!enterBusy.get()) {
+ return
CompletableFuture.failedFuture(sessionIsClosedException());
+ }
- return
CompletableFuture.failedFuture(sessionIsClosedException());
- }
+ try {
+ return qryProc.querySingleAsync(properties, transactions,
transaction, query, args)
+ .thenCompose(cursor -> {
+ if (!enterBusy.get()) {
+ cursor.closeAsync();
- try {
- int cursorId = registerCursor(cursor);
+ return
CompletableFuture.failedFuture(sessionIsClosedException());
+ }
- return cursor.requestNextAsync(1)
- .handle((page, th) -> {
-
openedCursors.remove(cursorId);
- cursor.closeAsync();
+ try {
+ int cursorId =
registerCursor.apply(cursor);
- if (th != null) {
- return
CompletableFuture.failedFuture(th);
- }
+ return cursor.requestNextAsync(1)
+ .handle((page, th) -> {
+ removeCursor.accept(cursorId);
+ cursor.closeAsync();
- validateDmlResult(page);
+ if (th != null) {
+ return
CompletableFuture.failedFuture(th);
+ }
- counters.add((long)
page.items().get(0).get(0));
+ validateDmlResult(page);
- return
nullCompletedFuture();
-
}).thenCompose(Function.identity());
- } finally {
- busyLock.leaveBusy();
- }
- });
- } finally {
- busyLock.leaveBusy();
- }
- });
+ counters.add((long)
page.items().get(0).get(0));
- batchFuts.add(tail);
- }
+ return nullCompletedFuture();
+
}).thenCompose(Function.identity());
+ } finally {
+ leaveBusy.run();
+ }
+ });
+ } finally {
+ leaveBusy.run();
+ }
+ });
- CompletableFuture<long[]> resFut = tail
- .exceptionally((ex) -> {
- Throwable cause = ExceptionUtils.unwrapCause(ex);
+ batchFuts.add(tail);
+ }
- if (cause instanceof CancellationException) {
- throw (CancellationException) cause;
- }
+ CompletableFuture<long[]> resFut = tail
+ .exceptionally((ex) -> {
+ Throwable cause = ExceptionUtils.unwrapCause(ex);
- Throwable t = mapToPublicSqlException(cause);
+ if (cause instanceof CancellationException) {
+ throw (CancellationException) cause;
+ }
- if (t instanceof TraceableException) {
- throw new SqlBatchException(
- ((TraceableException) t).traceId(),
- ((TraceableException) t).code(),
-
counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY),
- t);
- }
+ Throwable t = mapToPublicSqlException(cause);
- // JVM error.
- throw new CompletionException(cause);
- })
- .thenApply(v ->
counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY));
+ if (t instanceof TraceableException) {
+ throw new SqlBatchException(
+ ((TraceableException) t).traceId(),
+ ((TraceableException) t).code(),
+ counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY),
+ t);
+ }
- resFut.whenComplete((cur, ex) -> {
- if (ExceptionUtils.unwrapCause(ex) instanceof
CancellationException) {
- batchFuts.forEach(f -> f.cancel(false));
- }
- });
+ // JVM error.
+ throw new CompletionException(cause);
+ })
+ .thenApply(v -> counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY));
- return resFut;
- } catch (Exception e) {
- return CompletableFuture.failedFuture(mapToPublicSqlException(e));
- } finally {
- busyLock.leaveBusy();
- }
- }
+ resFut.whenComplete((cur, ex) -> {
+ if (ExceptionUtils.unwrapCause(ex) instanceof
CancellationException) {
+ batchFuts.forEach(f -> f.cancel(false));
+ }
+ });
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ return resFut;
}
/** {@inheritDoc} */
@@ -416,16 +458,46 @@ public class SessionImpl implements AbstractSession {
.set(QueryProperty.ALLOWED_QUERY_TYPES, SqlQueryType.ALL)
.build();
- CompletableFuture<Void> resFut = new CompletableFuture<>();
try {
- CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f =
- qryProc.queryScriptAsync(properties, transactions, null,
query, arguments);
-
- ScriptHandler handler = new ScriptHandler(resFut);
- f.whenComplete(handler::processCursor);
+ return executeScriptCore(
+ qryProc,
+ transactions,
+ busyLock::enterBusy,
+ busyLock::leaveBusy,
+ query,
+ arguments,
+ properties);
} finally {
busyLock.leaveBusy();
}
+ }
+
+ /**
+ * Execute SQL script.
+ *
+ * @param qryProc Query processor.
+ * @param transactions Transactions facade.
+ * @param enterBusy Enter busy lock action.
+ * @param leaveBusy Leave busy lock action.
+ * @param query SQL script.
+ * @param arguments Arguments.
+ * @param properties Properties.
+ * @return Operation future.
+ */
+ public static CompletableFuture<Void> executeScriptCore(
+ QueryProcessor qryProc,
+ IgniteTransactions transactions,
+ Supplier<Boolean> enterBusy,
+ Runnable leaveBusy,
+ String query,
+ @Nullable Object[] arguments,
+ SqlProperties properties) {
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f =
+ qryProc.queryScriptAsync(properties, transactions, null,
query, arguments);
+
+ CompletableFuture<Void> resFut = new CompletableFuture<>();
+ ScriptHandler handler = new ScriptHandler(resFut, enterBusy,
leaveBusy);
+ f.whenComplete(handler::processCursor);
return resFut.exceptionally((th) -> {
Throwable cause = ExceptionUtils.unwrapCause(th);
@@ -606,12 +678,19 @@ public class SessionImpl implements AbstractSession {
return List.copyOf(openedCursors.values());
}
- private class ScriptHandler {
+ private static class ScriptHandler {
private final CompletableFuture<Void> resFut;
private final List<Throwable> cursorCloseErrors =
Collections.synchronizedList(new ArrayList<>());
+ private final Supplier<Boolean> enterBusy;
+ private final Runnable leaveBusy;
- ScriptHandler(CompletableFuture<Void> resFut) {
+ ScriptHandler(
+ CompletableFuture<Void> resFut,
+ Supplier<Boolean> enterBusy,
+ Runnable leaveBusy) {
this.resFut = resFut;
+ this.enterBusy = enterBusy;
+ this.leaveBusy = leaveBusy;
}
void processCursor(AsyncSqlCursor<InternalSqlRow> cursor, Throwable
scriptError) {
@@ -628,7 +707,7 @@ public class SessionImpl implements AbstractSession {
cursorCloseErrors.add(cursorCloseError);
}
- if (!busyLock.enterBusy()) {
+ if (!enterBusy.get()) {
onFail(sessionIsClosedException());
return;
}
@@ -639,7 +718,7 @@ public class SessionImpl implements AbstractSession {
return;
}
} finally {
- busyLock.leaveBusy();
+ leaveBusy.run();
}
onComplete();