This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 563b9488c feat(java/driver/jni): implement executePartitioned (#4263)
563b9488c is described below

commit 563b9488cb9c5146e820b3e0bdcb5667a88340f0
Author: David Li <[email protected]>
AuthorDate: Mon Apr 27 11:36:58 2026 +0900

    feat(java/driver/jni): implement executePartitioned (#4263)
    
    Closes #4262.
---
 .github/workflows/java.yml                         |   2 +-
 go/adbc/driver/flightsql/flightsql_connection.go   |   4 +-
 go/adbc/driver/flightsql/flightsql_statement.go    |   2 +-
 .../org/apache/arrow/adbc/core/AdbcStatement.java  |   9 +-
 .../adbc/driver/jni/FlightSqlIntegrationTest.java  |  98 ++++++++++++
 java/driver/jni/src/main/cpp/jni_wrapper.cc        | 173 ++++++++++++++++++++-
 .../arrow/adbc/driver/jni/JniConnection.java       |   6 +
 .../apache/arrow/adbc/driver/jni/JniStatement.java |   8 +
 .../arrow/adbc/driver/jni/impl/JniLoader.java      |  11 ++
 .../arrow/adbc/driver/jni/impl/NativeAdbc.java     |   6 +
 .../driver/jni/impl/NativePartitionResult.java     |  61 ++++++++
 11 files changed, 368 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index 541a6eb82..60e41f42b 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -233,7 +233,7 @@ jobs:
       - name: Start Dependencies
         if: matrix.os == 'Linux' && matrix.arch == 'amd64'
         run: |
-          docker compose up --detach --wait mssql-test postgres-test
+          docker compose up --detach --wait flightsql-sqlite-test mssql-test 
postgres-test
           cat .env | grep -v -e '^#' | grep -e '^ADBC_' | awk NF | sed 
's/"//g' | tee -a $GITHUB_ENV
 
       - name: Download thirdparty driver
diff --git a/go/adbc/driver/flightsql/flightsql_connection.go 
b/go/adbc/driver/flightsql/flightsql_connection.go
index 473d38770..4f672d298 100644
--- a/go/adbc/driver/flightsql/flightsql_connection.go
+++ b/go/adbc/driver/flightsql/flightsql_connection.go
@@ -1145,7 +1145,7 @@ func (c *connectionImpl) ReadPartition(ctx 
context.Context, serializedPartition
        var info flight.FlightInfo
        if err := proto.Unmarshal(serializedPartition, &info); err != nil {
                return nil, adbc.Error{
-                       Msg:  err.Error(),
+                       Msg:  fmt.Sprintf("[flightsql] could not unmarshal 
partition as FlightInfo: %v", err),
                        Code: adbc.StatusInvalidArgument,
                }
        }
@@ -1153,7 +1153,7 @@ func (c *connectionImpl) ReadPartition(ctx 
context.Context, serializedPartition
        // The driver only ever returns one endpoint.
        if len(info.Endpoint) != 1 {
                return nil, adbc.Error{
-                       Msg:  fmt.Sprintf("Invalid partition: expected 1 
endpoint, got %d", len(info.Endpoint)),
+                       Msg:  fmt.Sprintf("[flightsql] invalid partition: 
expected 1 endpoint, got %d", len(info.Endpoint)),
                        Code: adbc.StatusInvalidArgument,
                }
        }
diff --git a/go/adbc/driver/flightsql/flightsql_statement.go 
b/go/adbc/driver/flightsql/flightsql_statement.go
index 85f750a0c..eb034ec66 100644
--- a/go/adbc/driver/flightsql/flightsql_statement.go
+++ b/go/adbc/driver/flightsql/flightsql_statement.go
@@ -811,7 +811,7 @@ func (s *statement) ExecutePartitions(ctx context.Context) 
(*arrow.Schema, adbc.
                data, err := proto.Marshal(partition)
                if err != nil {
                        return sc, out, -1, adbc.Error{
-                               Msg:  err.Error(),
+                               Msg:  fmt.Sprintf("[flightsql] could not 
marshal partition as FlightInfo: %v", err),
                                Code: adbc.StatusInternal,
                        }
                }
diff --git 
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java 
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
index a1f9e0f3b..bbbae4470 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * A container for all state needed to execute a database query, such as the 
query itself,
@@ -231,19 +232,21 @@ public interface AdbcStatement extends AutoCloseable, 
AdbcOptions {
 
   /** The partitions of a result set. */
   class PartitionResult {
-    private final Schema schema;
+    private final @Nullable Schema schema;
     private final long affectedRows;
     private final List<PartitionDescriptor> partitionDescriptors;
 
     public PartitionResult(
-        Schema schema, long affectedRows, List<PartitionDescriptor> 
partitionDescriptors) {
+        @Nullable Schema schema,
+        long affectedRows,
+        List<PartitionDescriptor> partitionDescriptors) {
       this.schema = schema;
       this.affectedRows = affectedRows;
       this.partitionDescriptors = partitionDescriptors;
     }
 
     /** Get the schema of the eventual result set. */
-    public Schema getSchema() {
+    public @Nullable Schema getSchema() {
       return schema;
     }
 
diff --git 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java
 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java
new file mode 100644
index 000000000..b5ab1c7ee
--- /dev/null
+++ 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.jni;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class FlightSqlIntegrationTest {
+  public static final String URI_ENV = "ADBC_SQLITE_FLIGHTSQL_URI";
+  static String URI = System.getenv(URI_ENV);
+
+  BufferAllocator allocator;
+  JniDriver driver;
+  AdbcDatabase db;
+  AdbcConnection conn;
+
+  @BeforeAll
+  static void beforeAll() {
+    Assumptions.assumeFalse(
+        URI == null || URI.isEmpty(),
+        String.format("Must set %s to run Flight SQL integration tests", 
URI_ENV));
+  }
+
+  @BeforeEach
+  void beforeEach() throws Exception {
+    allocator = new RootAllocator();
+    driver = new JniDriver(allocator);
+    Map<String, Object> parameters = new HashMap<>();
+    JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_flightsql");
+    AdbcDriver.PARAM_URI.set(parameters, URI);
+    db = driver.open(parameters);
+    conn = db.connect();
+  }
+
+  @AfterEach
+  void afterEach() throws Exception {
+    conn.close();
+    db.close();
+    allocator.close();
+  }
+
+  @Test
+  void simple() throws Exception {
+    try (var stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT 1 + 1 AS sum");
+      try (var reader = stmt.executeQuery()) {
+        assertThat(reader.getReader().loadNextBatch()).isTrue();
+        
assertThat(reader.getReader().getVectorSchemaRoot().getVector("sum").getObject(0))
+            .isEqualTo(2L);
+      }
+    }
+  }
+
+  @Test
+  void partitioned() throws Exception {
+    try (var stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT 1 + 1 AS sum");
+      var partitions = stmt.executePartitioned();
+      assertThat(partitions.getPartitionDescriptors().size()).isEqualTo(1);
+      assertThat(partitions.getAffectedRows()).isEqualTo(-1);
+      // The test server doesn't give a schema.
+      assertThat(partitions.getSchema()).isNull();
+
+      try (var reader =
+          
conn.readPartition(partitions.getPartitionDescriptors().get(0).getDescriptor()))
 {
+        assertThat(reader.loadNextBatch()).isTrue();
+        
assertThat(reader.getVectorSchemaRoot().getVector("sum").getObject(0)).isEqualTo(2L);
+      }
+    }
+  }
+}
diff --git a/java/driver/jni/src/main/cpp/jni_wrapper.cc 
b/java/driver/jni/src/main/cpp/jni_wrapper.cc
index 562ddf3fb..ef650fc08 100644
--- a/java/driver/jni/src/main/cpp/jni_wrapper.cc
+++ b/java/driver/jni/src/main/cpp/jni_wrapper.cc
@@ -34,6 +34,19 @@
 
 namespace {
 
+void ThrowJavaException(JNIEnv* env, const std::string& klass,
+                        const std::string& message) {
+  jclass exception_klass = env->FindClass(klass.c_str());
+  assert(exception_klass != nullptr);
+  jmethodID exception_ctor =
+      env->GetMethodID(exception_klass, "<init>", "(Ljava/lang/String)V");
+  assert(exception_ctor != nullptr);
+  jstring message_jni = env->NewStringUTF(message.c_str());
+  auto exc = static_cast<jthrowable>(
+      env->NewObject(exception_klass, exception_ctor, message_jni));
+  env->Throw(exc);
+}
+
 /// Internal exception.  Meant to be used with RaiseAdbcException and
 ///   CHECK_ADBC_ERROR.
 struct AdbcException {
@@ -112,19 +125,26 @@ void RaiseAdbcException(AdbcStatusCode code, const 
AdbcError& error) {
   } while (0)
 
 /// Require that a Java class exists or error.
-jclass RequireImplClass(JNIEnv* env, std::string_view name) {
-  static std::string kPrefix = "org/apache/arrow/adbc/driver/jni/impl/";
-  std::string full_name = kPrefix + std::string(name);
-  jclass klass = env->FindClass(full_name.c_str());
+jclass RequireClass(JNIEnv* env, const std::string& name) {
+  jclass klass = env->FindClass(name.c_str());
   if (klass == nullptr) {
+    std::string message = "[JNI] Could not find class ";
+    message += name;
     throw AdbcException{
         .code = ADBC_STATUS_INTERNAL,
-        .message = "[JNI] Could not find class " + full_name,
+        .message = std::move(message),
     };
   }
   return klass;
 }
 
+/// Require that a Java class exists or error.
+jclass RequireImplClass(JNIEnv* env, std::string_view name) {
+  static std::string kPrefix = "org/apache/arrow/adbc/driver/jni/impl/";
+  std::string full_name = kPrefix + std::string(name);
+  return RequireClass(env, full_name);
+}
+
 /// Require that a Java method exists or error.
 jmethodID RequireMethod(JNIEnv* env, jclass klass, std::string_view name,
                         std::string_view signature) {
@@ -381,6 +401,60 @@ 
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementGetParameterSchem
   return nullptr;
 }
 
+JNIEXPORT jobject JNICALL
+Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecutePartitions(
+    JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
+  struct AdbcError error = ADBC_ERROR_INIT;
+  auto* ptr = reinterpret_cast<struct 
AdbcStatement*>(static_cast<uintptr_t>(handle));
+  struct ArrowSchema schema = {};
+  struct AdbcPartitions partitions = {};
+  int64_t rows_affected = 0;
+  jobject result = nullptr;
+
+  try {
+    jclass native_result_class = RequireImplClass(env, 
"NativePartitionResult");
+    jmethodID native_result_ctor =
+        RequireMethod(env, native_result_class, "<init>", "(JJ)V");
+    jmethodID native_result_add_partition =
+        RequireMethod(env, native_result_class, "addPartition", "([B)V");
+
+    CHECK_ADBC_ERROR(
+        AdbcStatementExecutePartitions(ptr, &schema, &partitions, 
&rows_affected, &error),
+        error);
+
+    result = env->NewObject(native_result_class, native_result_ctor, 
rows_affected,
+                            
static_cast<jlong>(reinterpret_cast<uintptr_t>(&schema)));
+    if (env->ExceptionCheck()) goto cleanupall;
+
+    for (size_t i = 0; i < partitions.num_partitions; i++) {
+      size_t length = partitions.partition_lengths[i];
+      jbyteArray partition = env->NewByteArray(static_cast<jsize>(length));
+      env->SetByteArrayRegion(partition, 0, static_cast<jsize>(length),
+                              reinterpret_cast<const 
jbyte*>(partitions.partitions[i]));
+      if (env->ExceptionCheck()) goto cleanupall;
+      env->CallObjectMethod(result, native_result_add_partition, partition);
+      if (env->ExceptionCheck()) goto cleanupall;
+    }
+  } catch (const AdbcException& e) {
+    e.ThrowJavaException(env);
+  }
+
+  // We can't release schema, but we copied out the partitions
+  if (partitions.release != nullptr) {
+    partitions.release(&partitions);
+  }
+  return result;
+
+cleanupall:
+  if (schema.release != nullptr) {
+    schema.release(&schema);
+  }
+  if (partitions.release != nullptr) {
+    partitions.release(&partitions);
+  }
+  return nullptr;
+}
+
 JNIEXPORT jobject JNICALL
 Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecuteQuery(
     JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
@@ -978,6 +1052,95 @@ 
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionRollback(
   }
 }
 
+JNIEXPORT jobject JNICALL
+Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionReadPartition(
+    JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jobject 
partition) {
+  struct AdbcError error = ADBC_ERROR_INIT;
+  auto* conn = reinterpret_cast<struct 
AdbcConnection*>(static_cast<uintptr_t>(handle));
+  struct ArrowArrayStream out = {};
+  size_t serialized_length = 0;
+  const uint8_t* serialized_partition = nullptr;
+  std::vector<uint8_t> allocated_partition;
+
+  try {
+    jclass bb_class = RequireClass(env, "java/nio/ByteBuffer");
+    jmethodID bb_remaining = RequireMethod(env, bb_class, "remaining", "()I");
+
+    if (!env->IsInstanceOf(partition, bb_class)) {
+      ThrowJavaException(env, "java/lang/IllegalArgumentException",
+                         "Partition must be a ByteBuffer");
+      return nullptr;
+    }
+    jint remaining = env->CallIntMethod(partition, bb_remaining);
+    if (remaining < 0) {
+      ThrowJavaException(env, "java/lang/IllegalArgumentException",
+                         "ByteBuffer remaining() must be non-negative");
+      return nullptr;
+    }
+    serialized_length = static_cast<size_t>(remaining);
+
+    // fast path (if direct buffer)
+    void* buf = env->GetDirectBufferAddress(partition);
+    if (buf) {
+      serialized_partition = static_cast<const uint8_t*>(buf);
+    }
+
+    // middle path (backing array)
+    if (!serialized_partition) {
+      jmethodID bb_has_array = RequireMethod(env, bb_class, "hasArray", "()Z");
+      jmethodID bb_array = RequireMethod(env, bb_class, "array", "()[B");
+      jmethodID bb_array_offset = RequireMethod(env, bb_class, "arrayOffset", 
"()I");
+      jboolean has_array = env->CallBooleanMethod(partition, bb_has_array);
+      if (env->ExceptionCheck()) return nullptr;
+      if (has_array) {
+        jint array_offset = env->CallIntMethod(partition, bb_array_offset);
+        if (env->ExceptionCheck()) return nullptr;
+
+        auto array =
+            reinterpret_cast<jbyteArray>(env->CallObjectMethod(partition, 
bb_array));
+        if (env->ExceptionCheck()) return nullptr;
+
+        assert(serialized_length <= 
static_cast<size_t>(env->GetArrayLength(array)));
+        allocated_partition.resize(serialized_length);
+        env->GetByteArrayRegion(array, array_offset,
+                                static_cast<jsize>(serialized_length),
+                                
reinterpret_cast<jbyte*>(allocated_partition.data()));
+        serialized_partition = allocated_partition.data();
+      }
+    }
+
+    // slow path (copy)
+    if (!serialized_partition) {
+      jmethodID bb_get = RequireMethod(env, bb_class, "get", 
"([B)Ljava/nio/ByteBuffer;");
+      jbyteArray temp = 
env->NewByteArray(static_cast<jsize>(serialized_length));
+      if (!temp) {
+        ThrowJavaException(env, "java/lang/OutOfMemoryError",
+                           "Failed to allocate byte array for partition");
+        return nullptr;
+      }
+
+      env->CallVoidMethod(partition, bb_get, temp);
+      if (env->ExceptionCheck()) return nullptr;
+
+      allocated_partition.resize(serialized_length);
+      env->GetByteArrayRegion(temp, 0, static_cast<jsize>(serialized_length),
+                              
reinterpret_cast<jbyte*>(allocated_partition.data()));
+      serialized_partition = allocated_partition.data();
+    }
+
+    assert(serialized_partition != nullptr);
+
+    CHECK_ADBC_ERROR(AdbcConnectionReadPartition(conn, serialized_partition,
+                                                 serialized_length, &out, 
&error),
+                     error);
+
+    return MakeNativeQueryResult(env, -1, &out);
+  } catch (const AdbcException& e) {
+    e.ThrowJavaException(env);
+  }
+  return nullptr;
+}
+
 JNIEXPORT jbyteArray JNICALL
 Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_databaseGetOptionBytes(
     JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jstring key) {
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
index 5a121ebb9..46203c874 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
@@ -17,6 +17,7 @@
 
 package org.apache.arrow.adbc.driver.jni;
 
+import java.nio.ByteBuffer;
 import org.apache.arrow.adbc.core.AdbcConnection;
 import org.apache.arrow.adbc.core.AdbcException;
 import org.apache.arrow.adbc.core.AdbcStatement;
@@ -254,6 +255,11 @@ public class JniConnection implements AdbcConnection {
     setOption(JniDriver.CURRENT_DB_SCHEMA, dbSchema);
   }
 
+  @Override
+  public ArrowReader readPartition(ByteBuffer descriptor) throws AdbcException 
{
+    return JniLoader.INSTANCE.connectionReadPartition(handle, 
descriptor).importStream(allocator);
+  }
+
   @Override
   public void close() {
     handle.close();
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
index ec8fd46b2..dd4f581b2 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
@@ -21,6 +21,7 @@ import org.apache.arrow.adbc.core.AdbcException;
 import org.apache.arrow.adbc.core.AdbcStatement;
 import org.apache.arrow.adbc.core.TypedKey;
 import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
+import org.apache.arrow.adbc.driver.jni.impl.NativePartitionResult;
 import org.apache.arrow.adbc.driver.jni.impl.NativeQueryResult;
 import org.apache.arrow.adbc.driver.jni.impl.NativeStatementHandle;
 import org.apache.arrow.c.ArrowArray;
@@ -72,6 +73,13 @@ public class JniStatement implements AdbcStatement {
     }
   }
 
+  @Override
+  public PartitionResult executePartitioned() throws AdbcException {
+    exportBind();
+    NativePartitionResult result = 
JniLoader.INSTANCE.statementExecutePartitions(handle);
+    return result.importResult(allocator);
+  }
+
   @Override
   public QueryResult executeQuery() throws AdbcException {
     exportBind();
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
index ebfc3c51c..b081b4b0f 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.Map;
@@ -85,6 +86,11 @@ public enum JniLoader {
     NativeAdbc.statementCancel(statement.getStatementHandle());
   }
 
+  public NativePartitionResult 
statementExecutePartitions(NativeStatementHandle statement)
+      throws AdbcException {
+    return 
NativeAdbc.statementExecutePartitions(statement.getStatementHandle());
+  }
+
   public NativeQueryResult statementExecuteQuery(NativeStatementHandle 
statement)
       throws AdbcException {
     return NativeAdbc.statementExecuteQuery(statement.getStatementHandle());
@@ -207,6 +213,11 @@ public enum JniLoader {
     NativeAdbc.connectionRollback(connection.getConnectionHandle());
   }
 
+  public NativeQueryResult connectionReadPartition(
+      NativeConnectionHandle connection, ByteBuffer partition) throws 
AdbcException {
+    return 
NativeAdbc.connectionReadPartition(connection.getConnectionHandle(), partition);
+  }
+
   public byte[] connectionGetOptionBytes(NativeConnectionHandle handle, String 
key)
       throws AdbcException {
     return NativeAdbc.connectionGetOptionBytes(handle.getConnectionHandle(), 
key);
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
index fb93dacc8..68ad348a7 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
@@ -17,6 +17,7 @@
 
 package org.apache.arrow.adbc.driver.jni.impl;
 
+import java.nio.ByteBuffer;
 import org.apache.arrow.adbc.core.AdbcException;
 
 /** All the JNI methods. Don't use this directly, prefer {@link JniLoader}. */
@@ -48,6 +49,8 @@ class NativeAdbc {
 
   static native void statementPrepare(long handle) throws AdbcException;
 
+  static native NativePartitionResult statementExecutePartitions(long handle) 
throws AdbcException;
+
   static native NativeQueryResult statementExecuteQuery(long handle) throws 
AdbcException;
 
   static native NativeSchemaResult statementExecuteSchema(long handle) throws 
AdbcException;
@@ -101,6 +104,9 @@ class NativeAdbc {
 
   static native void connectionRollback(long handle) throws AdbcException;
 
+  static native NativeQueryResult connectionReadPartition(long handle, 
ByteBuffer partition)
+      throws AdbcException;
+
   static native byte[] connectionGetOptionBytes(long handle, String key) 
throws AdbcException;
 
   static native double connectionGetOptionDouble(long handle, String key) 
throws AdbcException;
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java
new file mode 100644
index 000000000..9b5526f6a
--- /dev/null
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.jni.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.PartitionDescriptor;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class NativePartitionResult {
+  private final List<PartitionDescriptor> partitions;
+  private final long rowsAffected;
+  private final ArrowSchema.Snapshot schemaSnapshot;
+
+  public NativePartitionResult(long rowsAffected, long cDataSchema) {
+    this.partitions = new ArrayList<>();
+    this.rowsAffected = rowsAffected;
+    // Immediately snapshot the stream to take ownership of the contents.
+    // The address may point to a stack-allocated struct that becomes invalid
+    // after the JNI call returns.
+    this.schemaSnapshot = ArrowSchema.wrap(cDataSchema).snapshot();
+  }
+
+  /** For use by JNI code only. */
+  public void addPartition(byte[] partition) {
+    this.partitions.add(new PartitionDescriptor(ByteBuffer.wrap(partition)));
+  }
+
+  public AdbcStatement.PartitionResult importResult(BufferAllocator allocator) 
{
+    try (final ArrowSchema schemaHandle = ArrowSchema.allocateNew(allocator)) {
+      // It's possible the driver doesn't give us a schema.
+      schemaHandle.save(schemaSnapshot);
+      // TODO(lidavidm): work out dictionaries
+      Schema schema = null;
+      if (schemaSnapshot.release != 0) {
+        schema = Data.importSchema(allocator, schemaHandle, null);
+      }
+      return new AdbcStatement.PartitionResult(schema, rowsAffected, 
partitions);
+    }
+  }
+}

Reply via email to