This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 79f6cfc21f IGNITE-17059 Java thin: Implement batch SQL API for java thin client (#3964) 79f6cfc21f is described below commit 79f6cfc21f00fac9040ad2bbf2c78164b3e0bc62 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Tue Jun 25 13:08:37 2024 +0300 IGNITE-17059 Java thin: Implement batch SQL API for java thin client (#3964) --- .../ignite/example/sql/ItSqlExamplesTest.java | 2 +- .../org/apache/ignite/sql/BatchedArguments.java | 152 +++++++++++++++++++-- .../org/apache/ignite/sql/SqlBatchException.java | 14 ++ .../apache/ignite/sql/BatchedArgumentsTest.java | 107 +++++++++++++++ .../internal/client/proto/ClientMessagePacker.java | 34 +++++ .../client/proto/ClientMessageUnpacker.java | 27 +++- .../requests/sql/ClientSqlExecuteBatchRequest.java | 14 +- .../ignite/internal/client/sql/ClientSql.java | 107 ++++++++++++--- .../platforms/cpp/ignite/odbc/query/data_query.cpp | 4 +- .../sql/api/ItSqlClientAsynchronousApiTest.java | 18 --- .../sql/api/ItSqlClientSynchronousApiTest.java | 18 --- .../ignite/internal/sql/api/IgniteSqlImpl.java | 20 +-- 12 files changed, 429 insertions(+), 88 deletions(-) diff --git a/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java b/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java index 7d47c049ad..944e6de275 100644 --- a/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java +++ b/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java @@ -58,7 +58,7 @@ public class ItSqlExamplesTest extends AbstractExamplesTest { * * @throws Exception If failed. */ - @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059") + @Disabled("https://issues.apache.org/jira/browse/IGNITE-22262") @Test public void testSqlApiExample() throws Exception { assertConsoleOutputContains(SqlApiExample::main, EMPTY_ARGS, diff --git a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java index 019a5d46ec..9c34593e3c 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java @@ -18,17 +18,18 @@ package org.apache.ignite.sql; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; /** * Arguments for batch query execution. - * - * <p>TODO: replace inheritance with delegation. - * TODO: add arguments length validation. - * TODO: add named arguments support. */ -public class BatchedArguments extends ArrayList<List<Object>> implements List<List<Object>> { +public final class BatchedArguments implements Iterable<List<Object>> { + /** Batched arguments. */ + private final List<List<Object>> batchedArgs; + /** * Creates batched arguments. * @@ -45,9 +46,9 @@ public class BatchedArguments extends ArrayList<List<Object>> implements List<Li * @return Batch query arguments. */ public static BatchedArguments of(Object... args) { - BatchedArguments arguments = new BatchedArguments(); + BatchedArguments arguments = create(); - arguments.add(Arrays.asList(args)); + arguments.add(args); return arguments; } @@ -55,15 +56,13 @@ public class BatchedArguments extends ArrayList<List<Object>> implements List<Li /** * Creates batched arguments. * - * @param args Arguments. + * @param batchedArgs Arguments. * @return Batch query arguments. */ - public static BatchedArguments of(List<List<Object>> args) { - BatchedArguments arguments = new BatchedArguments(); - - arguments.addAll(args); + public static BatchedArguments from(List<List<Object>> batchedArgs) { + Objects.requireNonNull(batchedArgs, "batchedArgs"); - return arguments; + return new BatchedArguments(batchedArgs); } /** @@ -73,8 +72,131 @@ public class BatchedArguments extends ArrayList<List<Object>> implements List<Li * @return {@code this} for chaining. */ public BatchedArguments add(Object... args) { - add(List.of(args)); + Objects.requireNonNull(args, "args"); + + return addArguments(List.of(args)); + } + + /** + * Returns the arguments list at the specified position. + * + * @param index index of the element to return. + * @return Arguments list. + */ + public List<Object> get(int index) { + return batchedArgs.get(index); + } + + /** + * Returns the size of this batch. + * + * @return Batch size. + */ + public int size() { + return batchedArgs.size(); + } + + /** + * Returns {@code true} if this batch contains is empty. + * + * @return {@code True} if this batch contains is empty. + */ + public boolean isEmpty() { + return batchedArgs.isEmpty(); + } + + /** + * Returns an iterator over the elements in this batch. + * + * @return Iterator over the elements in this batch. + */ + @Override + public Iterator<List<Object>> iterator() { + return new ImmutableIterator<>(batchedArgs.iterator()); + } + + /** Constructor. */ + private BatchedArguments() { + this.batchedArgs = new ArrayList<>(); + } + + /** Constructor. */ + private BatchedArguments(List<List<Object>> batchedArgs) { + List<List<Object>> resultList = new ArrayList<>(batchedArgs.size()); + + int pos = 0; + int requiredLength = 0; + + for (List<Object> arguments : batchedArgs) { + Objects.requireNonNull(arguments, "Arguments list cannot be null."); + + if (arguments.isEmpty()) { + throwEmptyArgumentsException(); + } + + if (pos == 0) { + requiredLength = arguments.size(); + } else { + ensureRowLength(requiredLength, arguments.size()); + } + + resultList.add(List.copyOf(arguments)); + + ++pos; + } + + this.batchedArgs = resultList; + } + + private BatchedArguments addArguments(List<Object> immutableList) { + if (immutableList.isEmpty()) { + throwEmptyArgumentsException(); + } + + if (!batchedArgs.isEmpty()) { + ensureRowLength(batchedArgs.get(0).size(), immutableList.size()); + } + + batchedArgs.add(immutableList); return this; } + + private static void throwEmptyArgumentsException() { + throw new IllegalArgumentException("Non empty arguments required."); + } + + private static void ensureRowLength(int expected, int actual) { + if (expected != actual) { + throw new IllegalArgumentException("Argument lists must be the same size."); + } + } + + private static class ImmutableIterator<E> implements Iterator<E> { + private final Iterator<E> delegate; + + private ImmutableIterator(Iterator<E> delegate) { + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public E next() { + return delegate.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public void forEachRemaining(Consumer<? super E> action) { + delegate.forEachRemaining(action); + } + } } diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java index 3e5476eb6a..61b3e882ae 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java @@ -49,6 +49,20 @@ public class SqlBatchException extends SqlException { this.updCntrs = updCntrs != null ? updCntrs : LONG_EMPTY_ARRAY; } + /** + * Creates an exception with the given error message. + * + * @param traceId Unique identifier of the exception. + * @param code Full error code. + * @param updCntrs Array that describes the outcome of a batch execution. + * @param message Detailed message. + */ + public SqlBatchException(UUID traceId, int code, long[] updCntrs, String message) { + super(traceId, code, message, null); + + this.updCntrs = updCntrs != null ? updCntrs : LONG_EMPTY_ARRAY; + } + /** * Creates an exception with the given trace ID, error code, detailed message, and cause. * diff --git a/modules/api/src/test/java/org/apache/ignite/sql/BatchedArgumentsTest.java b/modules/api/src/test/java/org/apache/ignite/sql/BatchedArgumentsTest.java new file mode 100644 index 0000000000..b9ae453d0a --- /dev/null +++ b/modules/api/src/test/java/org/apache/ignite/sql/BatchedArgumentsTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.sql; + +import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; + +/** + * Tests for {@link BatchedArguments}. + */ +public class BatchedArgumentsTest { + @Test + public void nullAndEmptyArgumentsAreForbidden() { + assertThrows(NullPointerException.class, "args", () -> BatchedArguments.of((Object[]) null)); + assertThrows(NullPointerException.class, "batchedArgs", () -> BatchedArguments.from((List<List<Object>>) null)); + assertThrows(NullPointerException.class, "Arguments list cannot be null.", () -> BatchedArguments.from(singletonList(null))); + assertThrows(IllegalArgumentException.class, "Non empty arguments required.", () -> BatchedArguments.of(new Object[0])); + + BatchedArguments batch = BatchedArguments.create(); + assertThrows(NullPointerException.class, "args", () -> batch.add((Object[]) null)); + assertThrows(IllegalArgumentException.class, "Non empty arguments required.", () -> batch.add(new Object[0])); + } + + @Test + public void iteratorIsImmutable() { + List<List<Object>> argLists = new ArrayList<>(2); + argLists.add(List.of(1, 2)); + argLists.add(List.of(3, 4)); + + BatchedArguments batch = BatchedArguments.from(argLists); + assertThat(batch.size(), is(argLists.size())); + + Iterator<List<Object>> itr = batch.iterator(); + assertThat(itr.next(), equalTo(argLists.get(0))); + assertThat(itr.hasNext(), is(true)); + assertThat(itr.next(), equalTo(argLists.get(1))); + assertThat(itr.hasNext(), is(false)); + + Assertions.assertThrows(UnsupportedOperationException.class, itr::remove); + + assertThat(batch.size(), is(argLists.size())); + } + + @Test + public void argumentsListsMustBeSameSize() { + List<List<Object>> argLists = new ArrayList<>(2); + argLists.add(List.of(1)); + argLists.add(List.of(2, 3)); + + assertThrows(IllegalArgumentException.class, "Argument lists must be the same size.", () -> BatchedArguments.from(argLists)); + + { + BatchedArguments batch = BatchedArguments.of(1); + assertThrows(IllegalArgumentException.class, "Argument lists must be the same size.", () -> batch.add(1, 2)); + } + } + + @Test + public void argumentsListsAreImmutable() { + List<List<Object>> argLists = new ArrayList<>(2); + argLists.add(List.of(1)); + argLists.add(List.of(2)); + + BatchedArguments batch = BatchedArguments.from(argLists); + assertThat(batch.size(), is(argLists.size())); + + argLists.add(List.of(3)); + assertThat(batch.size(), is(argLists.size() - 1)); + + argLists.clear(); + assertThat(batch.size(), is(2)); + + Assertions.assertThrows(UnsupportedOperationException.class, () -> batch.get(0).add("2")); + Assertions.assertThrows(UnsupportedOperationException.class, () -> batch.get(0).remove(0)); + } + + private static <T extends Throwable> void assertThrows(Class<T> expectedType, String expMsg, Executable executable) { + T ex = Assertions.assertThrows(expectedType, executable); + + assertThat(ex.getMessage(), containsString(expMsg)); + } +} diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java index d4ce29b45f..e3022adf8d 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.binarytuple.BinaryTupleParser; +import org.apache.ignite.sql.BatchedArguments; import org.jetbrains.annotations.Nullable; /** @@ -623,6 +624,39 @@ public class ClientMessagePacker implements AutoCloseable { packBinaryTuple(builder); } + /** + * Packs batched arguments into binary tuples. + * + * @param batchedArguments Batched arguments. + */ + public void packBatchedArgumentsAsBinaryTupleArray(BatchedArguments batchedArguments) { + assert !closed : "Packer is closed"; + + if (batchedArguments == null || batchedArguments.isEmpty()) { + packNil(); + + return; + } + + int rowLen = batchedArguments.get(0).size(); + + packInt(rowLen); + packInt(batchedArguments.size()); + packBoolean(false); // unused now, but we will need it in case of arguments load by pages. + + for (List<Object> values : batchedArguments) { + // Builder with inline schema. + // Every element in vals is represented by 3 tuple elements: type, scale, value. + var builder = new BinaryTupleBuilder(rowLen * 3); + + for (Object value : values) { + ClientBinaryTupleUtils.appendObject(builder, value); + } + + packBinaryTuple(builder); + } + } + /** * Packs an objects in BinaryTuple format. * diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java index 6742191bf2..1a4a2e64e2 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java @@ -730,13 +730,36 @@ public class ClientMessageUnpacker implements AutoCloseable { return res; } + /** + * Reads array of longs. + * + * @return Array of longs. + */ + public long[] unpackLongArray() { + assert refCnt > 0 : "Unpacker is closed"; + + int size = unpackInt(); + + if (size == 0) { + return ArrayUtils.LONG_EMPTY_ARRAY; + } + + long[] res = new long[size]; + + for (int i = 0; i < size; i++) { + res[i] = unpackInt(); + } + + return res; + } + /** * Unpacks batch of arguments from binary tuples. * * @return BatchedArguments object with the unpacked arguments. */ @SuppressWarnings("unused") - public BatchedArguments unpackObjectArrayFromBinaryTupleArray() { + public BatchedArguments unpackBatchedArgumentsFromBinaryTupleArray() { assert refCnt > 0 : "Unpacker is closed"; if (tryUnpackNil()) { @@ -745,7 +768,7 @@ public class ClientMessageUnpacker implements AutoCloseable { int rowLen = unpackInt(); int rows = unpackInt(); - boolean last = unpackBoolean(); // unused now, but we will need it in case of arguments load by pages. + unpackBoolean(); // unused now, but we will need it in case of arguments load by pages. BatchedArguments args = BatchedArguments.create(); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java index 7f96242fb7..6324a101fc 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.sql; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.internal.client.proto.ClientMessagePacker; @@ -57,7 +58,7 @@ public class ClientSqlExecuteBatchRequest { InternalTransaction tx = readTx(in, out, resources); ClientSqlProperties props = new ClientSqlProperties(in); String statement = in.unpackString(); - BatchedArguments arguments = in.unpackObjectArrayFromBinaryTupleArray(); + BatchedArguments arguments = in.unpackBatchedArgumentsFromBinaryTupleArray(); if (arguments == null) { // SQL engine requires non-null arguments, but we don't want to complicate the protocol with this requirement. @@ -87,7 +88,7 @@ public class ClientSqlExecuteBatchRequest { if (cause instanceof SqlBatchException) { var exBatch = ((SqlBatchException) cause); - writeBatchResult(out, exBatch.updateCounters(), exBatch.errorCode(), exBatch.getMessage()); + writeBatchResult(out, exBatch.updateCounters(), exBatch.code(), exBatch.getMessage(), exBatch.traceId()); return null; } @@ -102,16 +103,18 @@ public class ClientSqlExecuteBatchRequest { private static void writeBatchResult( ClientMessagePacker out, long[] affectedRows, - Short errorCode, - String errorMessage) { + int errorCode, + String errorMessage, + UUID traceId) { out.packNil(); // resourceId out.packBoolean(false); // has row set out.packBoolean(false); // has more pages out.packBoolean(false); // was applied out.packLongArray(affectedRows); // affected rows - out.packShort(errorCode); // error code + out.packInt(errorCode); // error code out.packString(errorMessage); // error message + out.packUuid(traceId); } private static void writeBatchResult( @@ -125,5 +128,6 @@ public class ClientSqlExecuteBatchRequest { out.packLongArray(affectedRows); // affected rows out.packNil(); // error code out.packNil(); // error message + out.packNil(); // trace id } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java index 7d126db224..67a6b05174 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java @@ -24,6 +24,7 @@ import java.time.ZoneId; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -33,6 +34,7 @@ import org.apache.ignite.internal.client.PayloadReader; import org.apache.ignite.internal.client.PayloadWriter; import org.apache.ignite.internal.client.ReliableChannel; import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils; +import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.client.proto.ClientOp; import org.apache.ignite.internal.client.tx.ClientLazyTransaction; import org.apache.ignite.internal.marshaller.MarshallersProvider; @@ -43,6 +45,7 @@ import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; +import org.apache.ignite.sql.SqlBatchException; import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.Statement; @@ -147,18 +150,17 @@ public class ClientSql implements IgniteSql { /** {@inheritDoc} */ @Override public long[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) { - try { - return executeBatchAsync(transaction, dmlQuery, batch).join(); - } catch (CompletionException e) { - throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); - } + return executeBatch(transaction, new StatementImpl(dmlQuery), batch); } /** {@inheritDoc} */ @Override public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) { - // TODO IGNITE-17059. - throw new UnsupportedOperationException("Not implemented yet."); + try { + return executeBatchAsync(transaction, dmlStatement, batch).join(); + } catch (CompletionException e) { + throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); + } } /** {@inheritDoc} */ @@ -244,15 +246,7 @@ public class ClientSql implements IgniteSql { //noinspection resource return ClientLazyTransaction.ensureStarted(transaction, ch, null) .thenCompose(tx -> tx.channel().serviceAsync(ClientOp.SQL_EXEC, payloadWriter, payloadReader)) - .exceptionally(e -> { - Throwable ex = unwrapCause(e); - if (ex instanceof TransactionException) { - var te = (TransactionException) ex; - throw new SqlException(te.traceId(), te.code(), te.getMessage(), te); - } - - throw ExceptionUtils.sneakyThrow(ex); - }); + .exceptionally(ClientSql::handleException); } catch (TransactionException e) { return CompletableFuture.failedFuture(new SqlException(e.traceId(), e.code(), e.getMessage(), e)); } @@ -264,15 +258,63 @@ public class ClientSql implements IgniteSql { /** {@inheritDoc} */ @Override public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) { - // TODO IGNITE-17059. - throw new UnsupportedOperationException("Not implemented yet."); + return executeBatchAsync(transaction, new StatementImpl(query), batch); } /** {@inheritDoc} */ @Override public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { - // TODO IGNITE-17059. - throw new UnsupportedOperationException("Not implemented yet."); + PayloadWriter payloadWriter = w -> { + writeTx(transaction, w); + + w.out().packString(statement.defaultSchema()); + w.out().packInt(statement.pageSize()); + w.out().packLong(statement.queryTimeout(TimeUnit.MILLISECONDS)); + w.out().packNil(); // sessionTimeout + w.out().packString(statement.timeZoneId().getId()); + + packProperties(w, null); + + w.out().packString(statement.query()); + w.out().packBatchedArgumentsAsBinaryTupleArray(batch); + w.out().packLong(ch.observableTimestamp()); + }; + + PayloadReader<BatchResultInternal> payloadReader = r -> { + ClientMessageUnpacker unpacker = r.in(); + + // skipping currently unused values: + // 1. resourceId + // 2. row set flag + // 3. more pages flag + // 4. was applied flag + unpacker.skipValues(4); + + long[] updateCounters = unpacker.unpackLongArray(); + + if (unpacker.tryUnpackNil()) { + // No error - skipping message string and trace id. + unpacker.skipValues(2); + + return new BatchResultInternal(updateCounters); + } + + int errCode = unpacker.unpackInt(); + String message = unpacker.tryUnpackNil() ? null : unpacker.unpackString(); + UUID traceId = unpacker.unpackUuid(); + + return new BatchResultInternal(new SqlBatchException(traceId, errCode, updateCounters, message)); + }; + + return ch.serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter, payloadReader) + .thenApply((batchRes) -> { + if (batchRes.exception != null) { + throw batchRes.exception; + } + + return batchRes.updCounters; + }) + .exceptionally(ClientSql::handleException); } /** {@inheritDoc} */ @@ -318,4 +360,29 @@ public class ClientSql implements IgniteSql { w.out().packBinaryTuple(builder); } + + private static <T> T handleException(Throwable e) { + Throwable ex = unwrapCause(e); + if (ex instanceof TransactionException) { + var te = (TransactionException) ex; + throw new SqlException(te.traceId(), te.code(), te.getMessage(), te); + } + + throw ExceptionUtils.sneakyThrow(ex); + } + + private static class BatchResultInternal { + final long[] updCounters; + final SqlBatchException exception; + + BatchResultInternal(long[] updCounters) { + this.updCounters = updCounters; + this.exception = null; + } + + BatchResultInternal(SqlBatchException exception) { + this.updCounters = null; + this.exception = exception; + } + } } diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp index 7fea78dfeb..8f55c5962d 100644 --- a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp +++ b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp @@ -344,12 +344,14 @@ sql_result data_query::make_request_execute() { m_executed = true; // Check error if this is a batch query - if (auto error_code = reader->read_int16_nullable(); error_code) { + if (auto error_code = reader->read_int32_nullable(); error_code) { auto error_message = reader->read_string(); throw odbc_error(error_code_to_sql_state(error::code(error_code.value())), error_message); } else { reader->skip(); // error message } + + reader->skip(); // trace id } }); diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java index 0cfde82266..ad98d06eda 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java @@ -23,8 +23,6 @@ import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; /** * Tests for asynchronous client SQL API. @@ -51,20 +49,4 @@ public class ItSqlClientAsynchronousApiTest extends ItSqlAsynchronousApiTest { protected IgniteTransactions igniteTx() { return client.transactions(); } - - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059") - @Override - public void batch() { - // TODO Method should be completely removed from this class after IGNITE-17059. - super.batch(); - } - - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059") - @Override - public void batchIncomplete() { - // TODO Method should be completely removed from this class after IGNITE-17059. - super.batchIncomplete(); - } } diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java index 8b6375d0ce..ca07766cc4 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java @@ -23,8 +23,6 @@ import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; /** * Tests for synchronous client SQL API. @@ -51,20 +49,4 @@ public class ItSqlClientSynchronousApiTest extends ItSqlSynchronousApiTest { protected IgniteTransactions igniteTx() { return client.transactions(); } - - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059") - @Override - public void batch() { - // TODO Method should be completely removed from this class after IGNITE-17059. - super.batch(); - } - - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059") - @Override - public void batchIncomplete() { - // TODO Method should be completely removed from this class after IGNITE-17059. - super.batchIncomplete(); - } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java index 4fa1510255..e0acc621a4 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java @@ -28,7 +28,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -342,7 +341,9 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { CompletableFuture<AsyncResultSet<SqlRow>> result; try { - SqlProperties properties = createPropertiesFromStatement(SqlQueryType.SINGLE_STMT_TYPES, statement); + SqlProperties properties = toPropertiesBuilder(statement) + .set(QueryProperty.ALLOWED_QUERY_TYPES, SqlQueryType.SINGLE_STMT_TYPES) + .build(); result = queryProcessor.queryAsync( properties, observableTimestampTracker, (InternalTransaction) transaction, statement.query(), arguments @@ -392,7 +393,7 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { } try { - SqlProperties properties = createPropertiesFromStatement(EnumSet.of(SqlQueryType.DML), statement); + SqlProperties properties = toPropertiesBuilder(statement).build(); return executeBatchCore( queryProcessor, @@ -439,6 +440,11 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { Runnable leaveBusy, Function<AsyncSqlCursor<?>, Integer> registerCursor, Consumer<Integer> removeCursor) { + + SqlProperties properties0 = SqlPropertiesHelper.chain(properties, SqlPropertiesHelper.newBuilder() + .set(QueryProperty.ALLOWED_QUERY_TYPES, EnumSet.of(SqlQueryType.DML)) + .build()); + var counters = new LongArrayList(batch.size()); CompletableFuture<?> tail = nullCompletedFuture(); ArrayList<CompletableFuture<?>> batchFuts = new ArrayList<>(batch.size()); @@ -452,7 +458,7 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { } try { - return queryProcessor.queryAsync(properties, observableTimestampTracker, transaction, query, args) + return queryProcessor.queryAsync(properties0, observableTimestampTracker, transaction, query, args) .thenCompose(cursor -> { if (!enterBusy.get()) { cursor.closeAsync(); @@ -592,12 +598,10 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { } } - private static SqlProperties createPropertiesFromStatement(Set<SqlQueryType> queryType, Statement statement) { + private static SqlProperties.Builder toPropertiesBuilder(Statement statement) { return SqlPropertiesHelper.newBuilder() - .set(QueryProperty.ALLOWED_QUERY_TYPES, queryType) .set(QueryProperty.TIME_ZONE_ID, statement.timeZoneId()) - .set(QueryProperty.DEFAULT_SCHEMA, statement.defaultSchema()) - .build(); + .set(QueryProperty.DEFAULT_SCHEMA, statement.defaultSchema()); } private int registerCursor(AsyncSqlCursor<?> cursor) {