This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 79f6cfc21f IGNITE-17059 Java thin: Implement batch SQL API for java 
thin client (#3964)
79f6cfc21f is described below

commit 79f6cfc21f00fac9040ad2bbf2c78164b3e0bc62
Author: Pavel Pereslegin <xxt...@gmail.com>
AuthorDate: Tue Jun 25 13:08:37 2024 +0300

    IGNITE-17059 Java thin: Implement batch SQL API for java thin client (#3964)
---
 .../ignite/example/sql/ItSqlExamplesTest.java      |   2 +-
 .../org/apache/ignite/sql/BatchedArguments.java    | 152 +++++++++++++++++++--
 .../org/apache/ignite/sql/SqlBatchException.java   |  14 ++
 .../apache/ignite/sql/BatchedArgumentsTest.java    | 107 +++++++++++++++
 .../internal/client/proto/ClientMessagePacker.java |  34 +++++
 .../client/proto/ClientMessageUnpacker.java        |  27 +++-
 .../requests/sql/ClientSqlExecuteBatchRequest.java |  14 +-
 .../ignite/internal/client/sql/ClientSql.java      | 107 ++++++++++++---
 .../platforms/cpp/ignite/odbc/query/data_query.cpp |   4 +-
 .../sql/api/ItSqlClientAsynchronousApiTest.java    |  18 ---
 .../sql/api/ItSqlClientSynchronousApiTest.java     |  18 ---
 .../ignite/internal/sql/api/IgniteSqlImpl.java     |  20 +--
 12 files changed, 429 insertions(+), 88 deletions(-)

diff --git 
a/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java
 
b/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java
index 7d47c049ad..944e6de275 100644
--- 
a/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java
+++ 
b/examples/src/integrationTest/java/org/apache/ignite/example/sql/ItSqlExamplesTest.java
@@ -58,7 +58,7 @@ public class ItSqlExamplesTest extends AbstractExamplesTest {
      *
      * @throws Exception If failed.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22262";)
     @Test
     public void testSqlApiExample() throws Exception {
         assertConsoleOutputContains(SqlApiExample::main, EMPTY_ARGS,
diff --git 
a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java 
b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
index 019a5d46ec..9c34593e3c 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java
@@ -18,17 +18,18 @@
 package org.apache.ignite.sql;
 
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
+import java.util.function.Consumer;
 
 /**
  * Arguments for batch query execution.
- *
- * <p>TODO: replace inheritance with delegation.
- * TODO: add arguments length validation.
- * TODO: add named arguments support.
  */
-public class BatchedArguments extends ArrayList<List<Object>> implements 
List<List<Object>> {
+public final class BatchedArguments implements Iterable<List<Object>> {
+    /** Batched arguments. */
+    private final List<List<Object>> batchedArgs;
+
     /**
      * Creates batched arguments.
      *
@@ -45,9 +46,9 @@ public class BatchedArguments extends ArrayList<List<Object>> 
implements List<Li
      * @return Batch query arguments.
      */
     public static BatchedArguments of(Object... args) {
-        BatchedArguments arguments = new BatchedArguments();
+        BatchedArguments arguments = create();
 
-        arguments.add(Arrays.asList(args));
+        arguments.add(args);
 
         return arguments;
     }
@@ -55,15 +56,13 @@ public class BatchedArguments extends 
ArrayList<List<Object>> implements List<Li
     /**
      * Creates batched arguments.
      *
-     * @param args Arguments.
+     * @param batchedArgs Arguments.
      * @return Batch query arguments.
      */
-    public static BatchedArguments of(List<List<Object>> args) {
-        BatchedArguments arguments = new BatchedArguments();
-
-        arguments.addAll(args);
+    public static BatchedArguments from(List<List<Object>> batchedArgs) {
+        Objects.requireNonNull(batchedArgs, "batchedArgs");
 
-        return arguments;
+        return new BatchedArguments(batchedArgs);
     }
 
     /**
@@ -73,8 +72,131 @@ public class BatchedArguments extends 
ArrayList<List<Object>> implements List<Li
      * @return {@code this} for chaining.
      */
     public BatchedArguments add(Object... args) {
-        add(List.of(args));
+        Objects.requireNonNull(args, "args");
+
+        return addArguments(List.of(args));
+    }
+
+    /**
+     * Returns the arguments list at the specified position.
+     *
+     * @param index index of the element to return.
+     * @return Arguments list.
+     */
+    public List<Object> get(int index) {
+        return batchedArgs.get(index);
+    }
+
+    /**
+     * Returns the size of this batch.
+     *
+     * @return Batch size.
+     */
+    public int size() {
+        return batchedArgs.size();
+    }
+
+    /**
+     * Returns {@code true} if this batch contains is empty.
+     *
+     * @return {@code True} if this batch contains is empty.
+     */
+    public boolean isEmpty() {
+        return batchedArgs.isEmpty();
+    }
+
+    /**
+     * Returns an iterator over the elements in this batch.
+     *
+     * @return Iterator over the elements in this batch.
+     */
+    @Override
+    public Iterator<List<Object>> iterator() {
+        return new ImmutableIterator<>(batchedArgs.iterator());
+    }
+
+    /** Constructor. */
+    private BatchedArguments() {
+        this.batchedArgs = new ArrayList<>();
+    }
+
+    /** Constructor. */
+    private BatchedArguments(List<List<Object>> batchedArgs) {
+        List<List<Object>> resultList = new ArrayList<>(batchedArgs.size());
+
+        int pos = 0;
+        int requiredLength = 0;
+
+        for (List<Object> arguments : batchedArgs) {
+            Objects.requireNonNull(arguments, "Arguments list cannot be 
null.");
+
+            if (arguments.isEmpty()) {
+                throwEmptyArgumentsException();
+            }
+
+            if (pos == 0) {
+                requiredLength = arguments.size();
+            } else {
+                ensureRowLength(requiredLength, arguments.size());
+            }
+
+            resultList.add(List.copyOf(arguments));
+
+            ++pos;
+        }
+
+        this.batchedArgs = resultList;
+    }
+
+    private BatchedArguments addArguments(List<Object> immutableList) {
+        if (immutableList.isEmpty()) {
+            throwEmptyArgumentsException();
+        }
+
+        if (!batchedArgs.isEmpty()) {
+            ensureRowLength(batchedArgs.get(0).size(), immutableList.size());
+        }
+
+        batchedArgs.add(immutableList);
 
         return this;
     }
+
+    private static void throwEmptyArgumentsException() {
+        throw new IllegalArgumentException("Non empty arguments required.");
+    }
+
+    private static void ensureRowLength(int expected, int actual) {
+        if (expected != actual) {
+            throw new IllegalArgumentException("Argument lists must be the 
same size.");
+        }
+    }
+
+    private static class ImmutableIterator<E> implements Iterator<E> {
+        private final Iterator<E> delegate;
+
+        private ImmutableIterator(Iterator<E> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return delegate.hasNext();
+        }
+
+        @Override
+        public E next() {
+            return delegate.next();
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void forEachRemaining(Consumer<? super E> action) {
+            delegate.forEachRemaining(action);
+        }
+    }
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java 
b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
index 3e5476eb6a..61b3e882ae 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
@@ -49,6 +49,20 @@ public class SqlBatchException extends SqlException {
         this.updCntrs = updCntrs != null ? updCntrs : LONG_EMPTY_ARRAY;
     }
 
+    /**
+     * Creates an exception with the given error message.
+     *
+     * @param traceId Unique identifier of the exception.
+     * @param code Full error code.
+     * @param updCntrs Array that describes the outcome of a batch execution.
+     * @param message Detailed message.
+     */
+    public SqlBatchException(UUID traceId, int code, long[] updCntrs, String 
message) {
+        super(traceId, code, message, null);
+
+        this.updCntrs = updCntrs != null ? updCntrs : LONG_EMPTY_ARRAY;
+    }
+
     /**
      * Creates an exception with the given trace ID, error code, detailed 
message, and cause.
      *
diff --git 
a/modules/api/src/test/java/org/apache/ignite/sql/BatchedArgumentsTest.java 
b/modules/api/src/test/java/org/apache/ignite/sql/BatchedArgumentsTest.java
new file mode 100644
index 0000000000..b9ae453d0a
--- /dev/null
+++ b/modules/api/src/test/java/org/apache/ignite/sql/BatchedArgumentsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.sql;
+
+import static java.util.Collections.singletonList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+/**
+ * Tests for {@link BatchedArguments}.
+ */
+public class BatchedArgumentsTest {
+    @Test
+    public void nullAndEmptyArgumentsAreForbidden() {
+        assertThrows(NullPointerException.class, "args", () -> 
BatchedArguments.of((Object[]) null));
+        assertThrows(NullPointerException.class, "batchedArgs", () -> 
BatchedArguments.from((List<List<Object>>) null));
+        assertThrows(NullPointerException.class, "Arguments list cannot be 
null.", () -> BatchedArguments.from(singletonList(null)));
+        assertThrows(IllegalArgumentException.class, "Non empty arguments 
required.", () -> BatchedArguments.of(new Object[0]));
+
+        BatchedArguments batch = BatchedArguments.create();
+        assertThrows(NullPointerException.class, "args", () -> 
batch.add((Object[]) null));
+        assertThrows(IllegalArgumentException.class, "Non empty arguments 
required.", () -> batch.add(new Object[0]));
+    }
+
+    @Test
+    public void iteratorIsImmutable() {
+        List<List<Object>> argLists = new ArrayList<>(2);
+        argLists.add(List.of(1, 2));
+        argLists.add(List.of(3, 4));
+
+        BatchedArguments batch = BatchedArguments.from(argLists);
+        assertThat(batch.size(), is(argLists.size()));
+
+        Iterator<List<Object>> itr = batch.iterator();
+        assertThat(itr.next(), equalTo(argLists.get(0)));
+        assertThat(itr.hasNext(), is(true));
+        assertThat(itr.next(), equalTo(argLists.get(1)));
+        assertThat(itr.hasNext(), is(false));
+
+        Assertions.assertThrows(UnsupportedOperationException.class, 
itr::remove);
+
+        assertThat(batch.size(), is(argLists.size()));
+    }
+
+    @Test
+    public void argumentsListsMustBeSameSize() {
+        List<List<Object>> argLists = new ArrayList<>(2);
+        argLists.add(List.of(1));
+        argLists.add(List.of(2, 3));
+
+        assertThrows(IllegalArgumentException.class, "Argument lists must be 
the same size.", () -> BatchedArguments.from(argLists));
+
+        {
+            BatchedArguments batch = BatchedArguments.of(1);
+            assertThrows(IllegalArgumentException.class, "Argument lists must 
be the same size.", () -> batch.add(1, 2));
+        }
+    }
+
+    @Test
+    public void argumentsListsAreImmutable() {
+        List<List<Object>> argLists = new ArrayList<>(2);
+        argLists.add(List.of(1));
+        argLists.add(List.of(2));
+
+        BatchedArguments batch = BatchedArguments.from(argLists);
+        assertThat(batch.size(), is(argLists.size()));
+
+        argLists.add(List.of(3));
+        assertThat(batch.size(), is(argLists.size() - 1));
+
+        argLists.clear();
+        assertThat(batch.size(), is(2));
+
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> 
batch.get(0).add("2"));
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> 
batch.get(0).remove(0));
+    }
+
+    private static <T extends Throwable> void assertThrows(Class<T> 
expectedType, String expMsg, Executable executable) {
+        T ex = Assertions.assertThrows(expectedType, executable);
+
+        assertThat(ex.getMessage(), containsString(expMsg));
+    }
+}
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index d4ce29b45f..e3022adf8d 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleParser;
+import org.apache.ignite.sql.BatchedArguments;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -623,6 +624,39 @@ public class ClientMessagePacker implements AutoCloseable {
         packBinaryTuple(builder);
     }
 
+    /**
+     * Packs batched arguments into binary tuples.
+     *
+     * @param batchedArguments Batched arguments.
+     */
+    public void packBatchedArgumentsAsBinaryTupleArray(BatchedArguments 
batchedArguments) {
+        assert !closed : "Packer is closed";
+
+        if (batchedArguments == null || batchedArguments.isEmpty()) {
+            packNil();
+
+            return;
+        }
+
+        int rowLen = batchedArguments.get(0).size();
+
+        packInt(rowLen);
+        packInt(batchedArguments.size());
+        packBoolean(false); // unused now, but we will need it in case of 
arguments load by pages.
+
+        for (List<Object> values : batchedArguments) {
+            // Builder with inline schema.
+            // Every element in vals is represented by 3 tuple elements: type, 
scale, value.
+            var builder = new BinaryTupleBuilder(rowLen * 3);
+
+            for (Object value : values) {
+                ClientBinaryTupleUtils.appendObject(builder, value);
+            }
+
+            packBinaryTuple(builder);
+        }
+    }
+
     /**
      * Packs an objects in BinaryTuple format.
      *
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
index 6742191bf2..1a4a2e64e2 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
@@ -730,13 +730,36 @@ public class ClientMessageUnpacker implements 
AutoCloseable {
         return res;
     }
 
+    /**
+     * Reads array of longs.
+     *
+     * @return Array of longs.
+     */
+    public long[] unpackLongArray() {
+        assert refCnt > 0 : "Unpacker is closed";
+
+        int size = unpackInt();
+
+        if (size == 0) {
+            return ArrayUtils.LONG_EMPTY_ARRAY;
+        }
+
+        long[] res = new long[size];
+
+        for (int i = 0; i < size; i++) {
+            res[i] = unpackInt();
+        }
+
+        return res;
+    }
+
     /**
      * Unpacks batch of arguments from binary tuples.
      *
      * @return BatchedArguments object with the unpacked arguments.
      */
     @SuppressWarnings("unused")
-    public BatchedArguments unpackObjectArrayFromBinaryTupleArray() {
+    public BatchedArguments unpackBatchedArgumentsFromBinaryTupleArray() {
         assert refCnt > 0 : "Unpacker is closed";
 
         if (tryUnpackNil()) {
@@ -745,7 +768,7 @@ public class ClientMessageUnpacker implements AutoCloseable 
{
 
         int rowLen = unpackInt();
         int rows = unpackInt();
-        boolean last = unpackBoolean(); // unused now, but we will need it in 
case of arguments load by pages.
+        unpackBoolean(); // unused now, but we will need it in case of 
arguments load by pages.
 
         BatchedArguments args = BatchedArguments.create();
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 7f96242fb7..6324a101fc 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.sql;
 
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
@@ -57,7 +58,7 @@ public class ClientSqlExecuteBatchRequest {
         InternalTransaction tx = readTx(in, out, resources);
         ClientSqlProperties props = new ClientSqlProperties(in);
         String statement = in.unpackString();
-        BatchedArguments arguments = 
in.unpackObjectArrayFromBinaryTupleArray();
+        BatchedArguments arguments = 
in.unpackBatchedArgumentsFromBinaryTupleArray();
 
         if (arguments == null) {
             // SQL engine requires non-null arguments, but we don't want to 
complicate the protocol with this requirement.
@@ -87,7 +88,7 @@ public class ClientSqlExecuteBatchRequest {
                         if (cause instanceof SqlBatchException) {
                             var exBatch = ((SqlBatchException) cause);
 
-                            writeBatchResult(out, exBatch.updateCounters(), 
exBatch.errorCode(), exBatch.getMessage());
+                            writeBatchResult(out, exBatch.updateCounters(), 
exBatch.code(), exBatch.getMessage(), exBatch.traceId());
                             return null;
                         }
 
@@ -102,16 +103,18 @@ public class ClientSqlExecuteBatchRequest {
     private static void writeBatchResult(
             ClientMessagePacker out,
             long[] affectedRows,
-            Short errorCode,
-            String errorMessage) {
+            int errorCode,
+            String errorMessage,
+            UUID traceId) {
         out.packNil(); // resourceId
 
         out.packBoolean(false); // has row set
         out.packBoolean(false); // has more pages
         out.packBoolean(false); // was applied
         out.packLongArray(affectedRows); // affected rows
-        out.packShort(errorCode); // error code
+        out.packInt(errorCode); // error code
         out.packString(errorMessage); // error message
+        out.packUuid(traceId);
     }
 
     private static void writeBatchResult(
@@ -125,5 +128,6 @@ public class ClientSqlExecuteBatchRequest {
         out.packLongArray(affectedRows); // affected rows
         out.packNil(); // error code
         out.packNil(); // error message
+        out.packNil(); // trace id
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 7d126db224..67a6b05174 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -24,6 +24,7 @@ import java.time.ZoneId;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.client.PayloadReader;
 import org.apache.ignite.internal.client.PayloadWriter;
 import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.marshaller.MarshallersProvider;
@@ -43,6 +45,7 @@ import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlBatchException;
 import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
@@ -147,18 +150,17 @@ public class ClientSql implements IgniteSql {
     /** {@inheritDoc} */
     @Override
     public long[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
-        try {
-            return executeBatchAsync(transaction, dmlQuery, batch).join();
-        } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
-        }
+        return executeBatch(transaction, new StatementImpl(dmlQuery), batch);
     }
 
     /** {@inheritDoc} */
     @Override
     public long[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
-        // TODO IGNITE-17059.
-        throw new UnsupportedOperationException("Not implemented yet.");
+        try {
+            return executeBatchAsync(transaction, dmlStatement, batch).join();
+        } catch (CompletionException e) {
+            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+        }
     }
 
     /** {@inheritDoc} */
@@ -244,15 +246,7 @@ public class ClientSql implements IgniteSql {
                 //noinspection resource
                 return ClientLazyTransaction.ensureStarted(transaction, ch, 
null)
                         .thenCompose(tx -> 
tx.channel().serviceAsync(ClientOp.SQL_EXEC, payloadWriter, payloadReader))
-                        .exceptionally(e -> {
-                            Throwable ex = unwrapCause(e);
-                            if (ex instanceof TransactionException) {
-                                var te = (TransactionException) ex;
-                                throw new SqlException(te.traceId(), 
te.code(), te.getMessage(), te);
-                            }
-
-                            throw ExceptionUtils.sneakyThrow(ex);
-                        });
+                        .exceptionally(ClientSql::handleException);
             } catch (TransactionException e) {
                 return CompletableFuture.failedFuture(new 
SqlException(e.traceId(), e.code(), e.getMessage(), e));
             }
@@ -264,15 +258,63 @@ public class ClientSql implements IgniteSql {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
-        // TODO IGNITE-17059.
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return executeBatchAsync(transaction, new StatementImpl(query), batch);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
-        // TODO IGNITE-17059.
-        throw new UnsupportedOperationException("Not implemented yet.");
+        PayloadWriter payloadWriter = w -> {
+            writeTx(transaction, w);
+
+            w.out().packString(statement.defaultSchema());
+            w.out().packInt(statement.pageSize());
+            w.out().packLong(statement.queryTimeout(TimeUnit.MILLISECONDS));
+            w.out().packNil(); // sessionTimeout
+            w.out().packString(statement.timeZoneId().getId());
+
+            packProperties(w, null);
+
+            w.out().packString(statement.query());
+            w.out().packBatchedArgumentsAsBinaryTupleArray(batch);
+            w.out().packLong(ch.observableTimestamp());
+        };
+
+        PayloadReader<BatchResultInternal> payloadReader = r -> {
+            ClientMessageUnpacker unpacker = r.in();
+
+            // skipping currently unused values:
+            // 1. resourceId
+            // 2. row set flag
+            // 3. more pages flag
+            // 4. was applied flag
+            unpacker.skipValues(4);
+
+            long[] updateCounters = unpacker.unpackLongArray();
+
+            if (unpacker.tryUnpackNil()) {
+                // No error - skipping message string and trace id.
+                unpacker.skipValues(2);
+
+                return new BatchResultInternal(updateCounters);
+            }
+
+            int errCode = unpacker.unpackInt();
+            String message = unpacker.tryUnpackNil() ? null : 
unpacker.unpackString();
+            UUID traceId = unpacker.unpackUuid();
+
+            return new BatchResultInternal(new SqlBatchException(traceId, 
errCode, updateCounters, message));
+        };
+
+        return ch.serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter, 
payloadReader)
+                .thenApply((batchRes) -> {
+                    if (batchRes.exception != null) {
+                        throw batchRes.exception;
+                    }
+
+                    return batchRes.updCounters;
+                })
+                .exceptionally(ClientSql::handleException);
     }
 
     /** {@inheritDoc} */
@@ -318,4 +360,29 @@ public class ClientSql implements IgniteSql {
 
         w.out().packBinaryTuple(builder);
     }
+
+    private static <T> T handleException(Throwable e) {
+        Throwable ex = unwrapCause(e);
+        if (ex instanceof TransactionException) {
+            var te = (TransactionException) ex;
+            throw new SqlException(te.traceId(), te.code(), te.getMessage(), 
te);
+        }
+
+        throw ExceptionUtils.sneakyThrow(ex);
+    }
+
+    private static class BatchResultInternal {
+        final long[] updCounters;
+        final SqlBatchException exception;
+
+        BatchResultInternal(long[] updCounters) {
+            this.updCounters = updCounters;
+            this.exception = null;
+        }
+
+        BatchResultInternal(SqlBatchException exception) {
+            this.updCounters = null;
+            this.exception = exception;
+        }
+    }
 }
diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp 
b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
index 7fea78dfeb..8f55c5962d 100644
--- a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
+++ b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
@@ -344,12 +344,14 @@ sql_result data_query::make_request_execute() {
             m_executed = true;
 
             // Check error if this is a batch query
-            if (auto error_code = reader->read_int16_nullable(); error_code) {
+            if (auto error_code = reader->read_int32_nullable(); error_code) {
                 auto error_message = reader->read_string();
                 throw 
odbc_error(error_code_to_sql_state(error::code(error_code.value())), 
error_message);
             } else {
                 reader->skip(); // error message
             }
+
+            reader->skip(); // trace id
         }
     });
 
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index 0cfde82266..ad98d06eda 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -23,8 +23,6 @@ import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
 
 /**
  * Tests for asynchronous client SQL API.
@@ -51,20 +49,4 @@ public class ItSqlClientAsynchronousApiTest extends 
ItSqlAsynchronousApiTest {
     protected IgniteTransactions igniteTx() {
         return client.transactions();
     }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
-    @Override
-    public void batch() {
-        // TODO Method should be completely removed from this class after 
IGNITE-17059.
-        super.batch();
-    }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
-    @Override
-    public void batchIncomplete() {
-        // TODO Method should be completely removed from this class after 
IGNITE-17059.
-        super.batchIncomplete();
-    }
 }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 8b6375d0ce..ca07766cc4 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -23,8 +23,6 @@ import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
 
 /**
  * Tests for synchronous client SQL API.
@@ -51,20 +49,4 @@ public class ItSqlClientSynchronousApiTest extends 
ItSqlSynchronousApiTest {
     protected IgniteTransactions igniteTx() {
         return client.transactions();
     }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
-    @Override
-    public void batch() {
-        // TODO Method should be completely removed from this class after 
IGNITE-17059.
-        super.batch();
-    }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
-    @Override
-    public void batchIncomplete() {
-        // TODO Method should be completely removed from this class after 
IGNITE-17059.
-        super.batchIncomplete();
-    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 4fa1510255..e0acc621a4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -28,7 +28,6 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -342,7 +341,9 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
         CompletableFuture<AsyncResultSet<SqlRow>> result;
 
         try {
-            SqlProperties properties = 
createPropertiesFromStatement(SqlQueryType.SINGLE_STMT_TYPES, statement);
+            SqlProperties properties = toPropertiesBuilder(statement)
+                    .set(QueryProperty.ALLOWED_QUERY_TYPES, 
SqlQueryType.SINGLE_STMT_TYPES)
+                    .build();
 
             result = queryProcessor.queryAsync(
                     properties, observableTimestampTracker, 
(InternalTransaction) transaction, statement.query(), arguments
@@ -392,7 +393,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
         }
 
         try {
-            SqlProperties properties = 
createPropertiesFromStatement(EnumSet.of(SqlQueryType.DML), statement);
+            SqlProperties properties = toPropertiesBuilder(statement).build();
 
             return executeBatchCore(
                     queryProcessor,
@@ -439,6 +440,11 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
             Runnable leaveBusy,
             Function<AsyncSqlCursor<?>, Integer> registerCursor,
             Consumer<Integer> removeCursor) {
+
+        SqlProperties properties0 = SqlPropertiesHelper.chain(properties, 
SqlPropertiesHelper.newBuilder()
+                .set(QueryProperty.ALLOWED_QUERY_TYPES, 
EnumSet.of(SqlQueryType.DML))
+                .build());
+
         var counters = new LongArrayList(batch.size());
         CompletableFuture<?> tail = nullCompletedFuture();
         ArrayList<CompletableFuture<?>> batchFuts = new 
ArrayList<>(batch.size());
@@ -452,7 +458,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
                 }
 
                 try {
-                    return queryProcessor.queryAsync(properties, 
observableTimestampTracker, transaction, query, args)
+                    return queryProcessor.queryAsync(properties0, 
observableTimestampTracker, transaction, query, args)
                             .thenCompose(cursor -> {
                                 if (!enterBusy.get()) {
                                     cursor.closeAsync();
@@ -592,12 +598,10 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
         }
     }
 
-    private static SqlProperties 
createPropertiesFromStatement(Set<SqlQueryType> queryType, Statement statement) 
{
+    private static SqlProperties.Builder toPropertiesBuilder(Statement 
statement) {
         return SqlPropertiesHelper.newBuilder()
-                .set(QueryProperty.ALLOWED_QUERY_TYPES, queryType)
                 .set(QueryProperty.TIME_ZONE_ID, statement.timeZoneId())
-                .set(QueryProperty.DEFAULT_SCHEMA, statement.defaultSchema())
-                .build();
+                .set(QueryProperty.DEFAULT_SCHEMA, statement.defaultSchema());
     }
 
     private int registerCursor(AsyncSqlCursor<?> cursor) {


Reply via email to