This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 563b9488c feat(java/driver/jni): implement executePartitioned (#4263)
563b9488c is described below
commit 563b9488cb9c5146e820b3e0bdcb5667a88340f0
Author: David Li <[email protected]>
AuthorDate: Mon Apr 27 11:36:58 2026 +0900
feat(java/driver/jni): implement executePartitioned (#4263)
Closes #4262.
---
.github/workflows/java.yml | 2 +-
go/adbc/driver/flightsql/flightsql_connection.go | 4 +-
go/adbc/driver/flightsql/flightsql_statement.go | 2 +-
.../org/apache/arrow/adbc/core/AdbcStatement.java | 9 +-
.../adbc/driver/jni/FlightSqlIntegrationTest.java | 98 ++++++++++++
java/driver/jni/src/main/cpp/jni_wrapper.cc | 173 ++++++++++++++++++++-
.../arrow/adbc/driver/jni/JniConnection.java | 6 +
.../apache/arrow/adbc/driver/jni/JniStatement.java | 8 +
.../arrow/adbc/driver/jni/impl/JniLoader.java | 11 ++
.../arrow/adbc/driver/jni/impl/NativeAdbc.java | 6 +
.../driver/jni/impl/NativePartitionResult.java | 61 ++++++++
11 files changed, 368 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index 541a6eb82..60e41f42b 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -233,7 +233,7 @@ jobs:
- name: Start Dependencies
if: matrix.os == 'Linux' && matrix.arch == 'amd64'
run: |
- docker compose up --detach --wait mssql-test postgres-test
+ docker compose up --detach --wait flightsql-sqlite-test mssql-test
postgres-test
cat .env | grep -v -e '^#' | grep -e '^ADBC_' | awk NF | sed
's/"//g' | tee -a $GITHUB_ENV
- name: Download thirdparty driver
diff --git a/go/adbc/driver/flightsql/flightsql_connection.go
b/go/adbc/driver/flightsql/flightsql_connection.go
index 473d38770..4f672d298 100644
--- a/go/adbc/driver/flightsql/flightsql_connection.go
+++ b/go/adbc/driver/flightsql/flightsql_connection.go
@@ -1145,7 +1145,7 @@ func (c *connectionImpl) ReadPartition(ctx
context.Context, serializedPartition
var info flight.FlightInfo
if err := proto.Unmarshal(serializedPartition, &info); err != nil {
return nil, adbc.Error{
- Msg: err.Error(),
+ Msg: fmt.Sprintf("[flightsql] could not unmarshal
partition as FlightInfo: %v", err),
Code: adbc.StatusInvalidArgument,
}
}
@@ -1153,7 +1153,7 @@ func (c *connectionImpl) ReadPartition(ctx
context.Context, serializedPartition
// The driver only ever returns one endpoint.
if len(info.Endpoint) != 1 {
return nil, adbc.Error{
- Msg: fmt.Sprintf("Invalid partition: expected 1
endpoint, got %d", len(info.Endpoint)),
+ Msg: fmt.Sprintf("[flightsql] invalid partition:
expected 1 endpoint, got %d", len(info.Endpoint)),
Code: adbc.StatusInvalidArgument,
}
}
diff --git a/go/adbc/driver/flightsql/flightsql_statement.go
b/go/adbc/driver/flightsql/flightsql_statement.go
index 85f750a0c..eb034ec66 100644
--- a/go/adbc/driver/flightsql/flightsql_statement.go
+++ b/go/adbc/driver/flightsql/flightsql_statement.go
@@ -811,7 +811,7 @@ func (s *statement) ExecutePartitions(ctx context.Context)
(*arrow.Schema, adbc.
data, err := proto.Marshal(partition)
if err != nil {
return sc, out, -1, adbc.Error{
- Msg: err.Error(),
+ Msg: fmt.Sprintf("[flightsql] could not
marshal partition as FlightInfo: %v", err),
Code: adbc.StatusInternal,
}
}
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
index a1f9e0f3b..bbbae4470 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* A container for all state needed to execute a database query, such as the
query itself,
@@ -231,19 +232,21 @@ public interface AdbcStatement extends AutoCloseable,
AdbcOptions {
/** The partitions of a result set. */
class PartitionResult {
- private final Schema schema;
+ private final @Nullable Schema schema;
private final long affectedRows;
private final List<PartitionDescriptor> partitionDescriptors;
public PartitionResult(
- Schema schema, long affectedRows, List<PartitionDescriptor>
partitionDescriptors) {
+ @Nullable Schema schema,
+ long affectedRows,
+ List<PartitionDescriptor> partitionDescriptors) {
this.schema = schema;
this.affectedRows = affectedRows;
this.partitionDescriptors = partitionDescriptors;
}
/** Get the schema of the eventual result set. */
- public Schema getSchema() {
+ public @Nullable Schema getSchema() {
return schema;
}
diff --git
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java
new file mode 100644
index 000000000..b5ab1c7ee
--- /dev/null
+++
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.jni;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class FlightSqlIntegrationTest {
+ public static final String URI_ENV = "ADBC_SQLITE_FLIGHTSQL_URI";
+ static String URI = System.getenv(URI_ENV);
+
+ BufferAllocator allocator;
+ JniDriver driver;
+ AdbcDatabase db;
+ AdbcConnection conn;
+
+ @BeforeAll
+ static void beforeAll() {
+ Assumptions.assumeFalse(
+ URI == null || URI.isEmpty(),
+ String.format("Must set %s to run Flight SQL integration tests",
URI_ENV));
+ }
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ allocator = new RootAllocator();
+ driver = new JniDriver(allocator);
+ Map<String, Object> parameters = new HashMap<>();
+ JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_flightsql");
+ AdbcDriver.PARAM_URI.set(parameters, URI);
+ db = driver.open(parameters);
+ conn = db.connect();
+ }
+
+ @AfterEach
+ void afterEach() throws Exception {
+ conn.close();
+ db.close();
+ allocator.close();
+ }
+
+ @Test
+ void simple() throws Exception {
+ try (var stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT 1 + 1 AS sum");
+ try (var reader = stmt.executeQuery()) {
+ assertThat(reader.getReader().loadNextBatch()).isTrue();
+
assertThat(reader.getReader().getVectorSchemaRoot().getVector("sum").getObject(0))
+ .isEqualTo(2L);
+ }
+ }
+ }
+
+ @Test
+ void partitioned() throws Exception {
+ try (var stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT 1 + 1 AS sum");
+ var partitions = stmt.executePartitioned();
+ assertThat(partitions.getPartitionDescriptors().size()).isEqualTo(1);
+ assertThat(partitions.getAffectedRows()).isEqualTo(-1);
+ // The test server doesn't give a schema.
+ assertThat(partitions.getSchema()).isNull();
+
+ try (var reader =
+
conn.readPartition(partitions.getPartitionDescriptors().get(0).getDescriptor()))
{
+ assertThat(reader.loadNextBatch()).isTrue();
+
assertThat(reader.getVectorSchemaRoot().getVector("sum").getObject(0)).isEqualTo(2L);
+ }
+ }
+ }
+}
diff --git a/java/driver/jni/src/main/cpp/jni_wrapper.cc
b/java/driver/jni/src/main/cpp/jni_wrapper.cc
index 562ddf3fb..ef650fc08 100644
--- a/java/driver/jni/src/main/cpp/jni_wrapper.cc
+++ b/java/driver/jni/src/main/cpp/jni_wrapper.cc
@@ -34,6 +34,19 @@
namespace {
+void ThrowJavaException(JNIEnv* env, const std::string& klass,
+ const std::string& message) {
+ jclass exception_klass = env->FindClass(klass.c_str());
+ assert(exception_klass != nullptr);
+ jmethodID exception_ctor =
+ env->GetMethodID(exception_klass, "<init>", "(Ljava/lang/String)V");
+ assert(exception_ctor != nullptr);
+ jstring message_jni = env->NewStringUTF(message.c_str());
+ auto exc = static_cast<jthrowable>(
+ env->NewObject(exception_klass, exception_ctor, message_jni));
+ env->Throw(exc);
+}
+
/// Internal exception. Meant to be used with RaiseAdbcException and
/// CHECK_ADBC_ERROR.
struct AdbcException {
@@ -112,19 +125,26 @@ void RaiseAdbcException(AdbcStatusCode code, const
AdbcError& error) {
} while (0)
/// Require that a Java class exists or error.
-jclass RequireImplClass(JNIEnv* env, std::string_view name) {
- static std::string kPrefix = "org/apache/arrow/adbc/driver/jni/impl/";
- std::string full_name = kPrefix + std::string(name);
- jclass klass = env->FindClass(full_name.c_str());
+jclass RequireClass(JNIEnv* env, const std::string& name) {
+ jclass klass = env->FindClass(name.c_str());
if (klass == nullptr) {
+ std::string message = "[JNI] Could not find class ";
+ message += name;
throw AdbcException{
.code = ADBC_STATUS_INTERNAL,
- .message = "[JNI] Could not find class " + full_name,
+ .message = std::move(message),
};
}
return klass;
}
+/// Require that a Java class exists or error.
+jclass RequireImplClass(JNIEnv* env, std::string_view name) {
+ static std::string kPrefix = "org/apache/arrow/adbc/driver/jni/impl/";
+ std::string full_name = kPrefix + std::string(name);
+ return RequireClass(env, full_name);
+}
+
/// Require that a Java method exists or error.
jmethodID RequireMethod(JNIEnv* env, jclass klass, std::string_view name,
std::string_view signature) {
@@ -381,6 +401,60 @@
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementGetParameterSchem
return nullptr;
}
+JNIEXPORT jobject JNICALL
+Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecutePartitions(
+ JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
+ struct AdbcError error = ADBC_ERROR_INIT;
+ auto* ptr = reinterpret_cast<struct
AdbcStatement*>(static_cast<uintptr_t>(handle));
+ struct ArrowSchema schema = {};
+ struct AdbcPartitions partitions = {};
+ int64_t rows_affected = 0;
+ jobject result = nullptr;
+
+ try {
+ jclass native_result_class = RequireImplClass(env,
"NativePartitionResult");
+ jmethodID native_result_ctor =
+ RequireMethod(env, native_result_class, "<init>", "(JJ)V");
+ jmethodID native_result_add_partition =
+ RequireMethod(env, native_result_class, "addPartition", "([B)V");
+
+ CHECK_ADBC_ERROR(
+ AdbcStatementExecutePartitions(ptr, &schema, &partitions,
&rows_affected, &error),
+ error);
+
+ result = env->NewObject(native_result_class, native_result_ctor,
rows_affected,
+
static_cast<jlong>(reinterpret_cast<uintptr_t>(&schema)));
+ if (env->ExceptionCheck()) goto cleanupall;
+
+ for (size_t i = 0; i < partitions.num_partitions; i++) {
+ size_t length = partitions.partition_lengths[i];
+ jbyteArray partition = env->NewByteArray(static_cast<jsize>(length));
+ env->SetByteArrayRegion(partition, 0, static_cast<jsize>(length),
+ reinterpret_cast<const
jbyte*>(partitions.partitions[i]));
+ if (env->ExceptionCheck()) goto cleanupall;
+ env->CallObjectMethod(result, native_result_add_partition, partition);
+ if (env->ExceptionCheck()) goto cleanupall;
+ }
+ } catch (const AdbcException& e) {
+ e.ThrowJavaException(env);
+ }
+
+ // We can't release schema, but we copied out the partitions
+ if (partitions.release != nullptr) {
+ partitions.release(&partitions);
+ }
+ return result;
+
+cleanupall:
+ if (schema.release != nullptr) {
+ schema.release(&schema);
+ }
+ if (partitions.release != nullptr) {
+ partitions.release(&partitions);
+ }
+ return nullptr;
+}
+
JNIEXPORT jobject JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecuteQuery(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
@@ -978,6 +1052,95 @@
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionRollback(
}
}
+JNIEXPORT jobject JNICALL
+Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionReadPartition(
+ JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jobject
partition) {
+ struct AdbcError error = ADBC_ERROR_INIT;
+ auto* conn = reinterpret_cast<struct
AdbcConnection*>(static_cast<uintptr_t>(handle));
+ struct ArrowArrayStream out = {};
+ size_t serialized_length = 0;
+ const uint8_t* serialized_partition = nullptr;
+ std::vector<uint8_t> allocated_partition;
+
+ try {
+ jclass bb_class = RequireClass(env, "java/nio/ByteBuffer");
+ jmethodID bb_remaining = RequireMethod(env, bb_class, "remaining", "()I");
+
+ if (!env->IsInstanceOf(partition, bb_class)) {
+ ThrowJavaException(env, "java/lang/IllegalArgumentException",
+ "Partition must be a ByteBuffer");
+ return nullptr;
+ }
+ jint remaining = env->CallIntMethod(partition, bb_remaining);
+ if (remaining < 0) {
+ ThrowJavaException(env, "java/lang/IllegalArgumentException",
+ "ByteBuffer remaining() must be non-negative");
+ return nullptr;
+ }
+ serialized_length = static_cast<size_t>(remaining);
+
+ // fast path (if direct buffer)
+ void* buf = env->GetDirectBufferAddress(partition);
+ if (buf) {
+ serialized_partition = static_cast<const uint8_t*>(buf);
+ }
+
+ // middle path (backing array)
+ if (!serialized_partition) {
+ jmethodID bb_has_array = RequireMethod(env, bb_class, "hasArray", "()Z");
+ jmethodID bb_array = RequireMethod(env, bb_class, "array", "()[B");
+ jmethodID bb_array_offset = RequireMethod(env, bb_class, "arrayOffset",
"()I");
+ jboolean has_array = env->CallBooleanMethod(partition, bb_has_array);
+ if (env->ExceptionCheck()) return nullptr;
+ if (has_array) {
+ jint array_offset = env->CallIntMethod(partition, bb_array_offset);
+ if (env->ExceptionCheck()) return nullptr;
+
+ auto array =
+ reinterpret_cast<jbyteArray>(env->CallObjectMethod(partition,
bb_array));
+ if (env->ExceptionCheck()) return nullptr;
+
+ assert(serialized_length <=
static_cast<size_t>(env->GetArrayLength(array)));
+ allocated_partition.resize(serialized_length);
+ env->GetByteArrayRegion(array, array_offset,
+ static_cast<jsize>(serialized_length),
+
reinterpret_cast<jbyte*>(allocated_partition.data()));
+ serialized_partition = allocated_partition.data();
+ }
+ }
+
+ // slow path (copy)
+ if (!serialized_partition) {
+ jmethodID bb_get = RequireMethod(env, bb_class, "get",
"([B)Ljava/nio/ByteBuffer;");
+ jbyteArray temp =
env->NewByteArray(static_cast<jsize>(serialized_length));
+ if (!temp) {
+ ThrowJavaException(env, "java/lang/OutOfMemoryError",
+ "Failed to allocate byte array for partition");
+ return nullptr;
+ }
+
+ env->CallVoidMethod(partition, bb_get, temp);
+ if (env->ExceptionCheck()) return nullptr;
+
+ allocated_partition.resize(serialized_length);
+ env->GetByteArrayRegion(temp, 0, static_cast<jsize>(serialized_length),
+
reinterpret_cast<jbyte*>(allocated_partition.data()));
+ serialized_partition = allocated_partition.data();
+ }
+
+ assert(serialized_partition != nullptr);
+
+ CHECK_ADBC_ERROR(AdbcConnectionReadPartition(conn, serialized_partition,
+ serialized_length, &out,
&error),
+ error);
+
+ return MakeNativeQueryResult(env, -1, &out);
+ } catch (const AdbcException& e) {
+ e.ThrowJavaException(env);
+ }
+ return nullptr;
+}
+
JNIEXPORT jbyteArray JNICALL
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_databaseGetOptionBytes(
JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jstring key) {
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
index 5a121ebb9..46203c874 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
@@ -17,6 +17,7 @@
package org.apache.arrow.adbc.driver.jni;
+import java.nio.ByteBuffer;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
@@ -254,6 +255,11 @@ public class JniConnection implements AdbcConnection {
setOption(JniDriver.CURRENT_DB_SCHEMA, dbSchema);
}
+ @Override
+ public ArrowReader readPartition(ByteBuffer descriptor) throws AdbcException
{
+ return JniLoader.INSTANCE.connectionReadPartition(handle,
descriptor).importStream(allocator);
+ }
+
@Override
public void close() {
handle.close();
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
index ec8fd46b2..dd4f581b2 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
@@ -21,6 +21,7 @@ import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
+import org.apache.arrow.adbc.driver.jni.impl.NativePartitionResult;
import org.apache.arrow.adbc.driver.jni.impl.NativeQueryResult;
import org.apache.arrow.adbc.driver.jni.impl.NativeStatementHandle;
import org.apache.arrow.c.ArrowArray;
@@ -72,6 +73,13 @@ public class JniStatement implements AdbcStatement {
}
}
+ @Override
+ public PartitionResult executePartitioned() throws AdbcException {
+ exportBind();
+ NativePartitionResult result =
JniLoader.INSTANCE.statementExecutePartitions(handle);
+ return result.importResult(allocator);
+ }
+
@Override
public QueryResult executeQuery() throws AdbcException {
exportBind();
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
index ebfc3c51c..b081b4b0f 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Map;
@@ -85,6 +86,11 @@ public enum JniLoader {
NativeAdbc.statementCancel(statement.getStatementHandle());
}
+ public NativePartitionResult
statementExecutePartitions(NativeStatementHandle statement)
+ throws AdbcException {
+ return
NativeAdbc.statementExecutePartitions(statement.getStatementHandle());
+ }
+
public NativeQueryResult statementExecuteQuery(NativeStatementHandle
statement)
throws AdbcException {
return NativeAdbc.statementExecuteQuery(statement.getStatementHandle());
@@ -207,6 +213,11 @@ public enum JniLoader {
NativeAdbc.connectionRollback(connection.getConnectionHandle());
}
+ public NativeQueryResult connectionReadPartition(
+ NativeConnectionHandle connection, ByteBuffer partition) throws
AdbcException {
+ return
NativeAdbc.connectionReadPartition(connection.getConnectionHandle(), partition);
+ }
+
public byte[] connectionGetOptionBytes(NativeConnectionHandle handle, String
key)
throws AdbcException {
return NativeAdbc.connectionGetOptionBytes(handle.getConnectionHandle(),
key);
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
index fb93dacc8..68ad348a7 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
@@ -17,6 +17,7 @@
package org.apache.arrow.adbc.driver.jni.impl;
+import java.nio.ByteBuffer;
import org.apache.arrow.adbc.core.AdbcException;
/** All the JNI methods. Don't use this directly, prefer {@link JniLoader}. */
@@ -48,6 +49,8 @@ class NativeAdbc {
static native void statementPrepare(long handle) throws AdbcException;
+ static native NativePartitionResult statementExecutePartitions(long handle)
throws AdbcException;
+
static native NativeQueryResult statementExecuteQuery(long handle) throws
AdbcException;
static native NativeSchemaResult statementExecuteSchema(long handle) throws
AdbcException;
@@ -101,6 +104,9 @@ class NativeAdbc {
static native void connectionRollback(long handle) throws AdbcException;
+ static native NativeQueryResult connectionReadPartition(long handle,
ByteBuffer partition)
+ throws AdbcException;
+
static native byte[] connectionGetOptionBytes(long handle, String key)
throws AdbcException;
static native double connectionGetOptionDouble(long handle, String key)
throws AdbcException;
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java
new file mode 100644
index 000000000..9b5526f6a
--- /dev/null
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.jni.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.PartitionDescriptor;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class NativePartitionResult {
+ private final List<PartitionDescriptor> partitions;
+ private final long rowsAffected;
+ private final ArrowSchema.Snapshot schemaSnapshot;
+
+ public NativePartitionResult(long rowsAffected, long cDataSchema) {
+ this.partitions = new ArrayList<>();
+ this.rowsAffected = rowsAffected;
+ // Immediately snapshot the stream to take ownership of the contents.
+ // The address may point to a stack-allocated struct that becomes invalid
+ // after the JNI call returns.
+ this.schemaSnapshot = ArrowSchema.wrap(cDataSchema).snapshot();
+ }
+
+ /** For use by JNI code only. */
+ public void addPartition(byte[] partition) {
+ this.partitions.add(new PartitionDescriptor(ByteBuffer.wrap(partition)));
+ }
+
+ public AdbcStatement.PartitionResult importResult(BufferAllocator allocator)
{
+ try (final ArrowSchema schemaHandle = ArrowSchema.allocateNew(allocator)) {
+ // It's possible the driver doesn't give us a schema.
+ schemaHandle.save(schemaSnapshot);
+ // TODO(lidavidm): work out dictionaries
+ Schema schema = null;
+ if (schemaSnapshot.release != 0) {
+ schema = Data.importSchema(allocator, schemaHandle, null);
+ }
+ return new AdbcStatement.PartitionResult(schema, rowsAffected,
partitions);
+ }
+ }
+}