This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 4c5812809 feat(java/driver/jni): implement binding of streams (#4359)
4c5812809 is described below
commit 4c5812809d669e6d74742686e95f2ac4af17bb5d
Author: David Li <[email protected]>
AuthorDate: Sun Jun 7 15:04:38 2026 -0700
feat(java/driver/jni): implement binding of streams (#4359)
Depends on #4357.
Closes #4358.
---
.../org/apache/arrow/adbc/core/AdbcException.java | 5 ++
.../org/apache/arrow/adbc/core/AdbcStatement.java | 15 +++++-
.../adbc/driver/jni/PostgresIntegrationTest.java | 41 ++++++++++++++
.../adbc/driver/jni/SqlServerIntegrationTest.java | 41 ++++++++++++++
.../apache/arrow/adbc/driver/jni/JniStatement.java | 63 +++++++++++++++++-----
.../arrow/adbc/driver/jni/impl/JniLoader.java | 6 +++
.../arrow/adbc/driver/jni/impl/NativeAdbc.java | 1 -
7 files changed, 157 insertions(+), 15 deletions(-)
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
index 104afe578..bf6b66d43 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
@@ -72,6 +72,11 @@ public class AdbcException extends Exception {
return new AdbcException(message, /*cause*/ null, AdbcStatusCode.IO, null,
0);
}
+ /** Create a new exception with code {@link AdbcStatusCode#INTERNAL}. */
+ public static AdbcException internal(String message) {
+ return new AdbcException(message, /*cause*/ null, AdbcStatusCode.INTERNAL,
null, 0);
+ }
+
/** Create a new exception with code {@link AdbcStatusCode#INVALID_STATE}. */
public static AdbcException invalidState(String message) {
return new AdbcException(message, /*cause*/ null,
AdbcStatusCode.INVALID_STATE, null, 0);
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
index bbbae4470..ba7333684 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
@@ -83,11 +83,24 @@ public interface AdbcStatement extends AutoCloseable,
AdbcOptions {
throw AdbcException.notImplemented("Statement does not support Substrait
plans");
}
- /** Bind this statement to a VectorSchemaRoot to provide parameter
values/bulk data ingestion. */
+ /**
+ * Bind a VectorSchemaRoot to provide parameter values/bulk data ingestion.
+ *
+ * <p>The statement will NOT close the root after use/on close/when other
data is bound.
+ */
default void bind(VectorSchemaRoot root) throws AdbcException {
throw AdbcException.notImplemented("Statement does not support bind");
}
+ /**
+ * Bind an ArrowReader to provide parameter values/bulk data ingestion.
+ *
+ * <p>The statement will close the reader after use/on close/when other data
is bound.
+ */
+ default void bind(ArrowReader reader) throws AdbcException {
+ throw AdbcException.notImplemented("Statement does not support
bindStream");
+ }
+
/**
* Execute a result set-generating query and get the result.
*
diff --git
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
index 2cc9cef81..facf99507 100644
---
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
+++
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
@@ -21,6 +21,8 @@ import static
org.apache.arrow.adbc.driver.testsuite.ArrowAssertions.assertSchem
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,6 +51,8 @@ import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -630,6 +634,43 @@ class PostgresIntegrationTest {
}
}
+ @Test
+ void bindStream() throws Exception {
+ // Create temp Arrow file to get an ArrowReader
+ // Note that the field name doesn't really matter
+ Schema schema = new Schema(List.of(Field.nullable("i",
Types.MinorType.INT.getType())));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null, baos)) {
+ writer.start();
+
+ IntVector iv = (IntVector) root.getVector(0);
+
+ iv.setSafe(0, 1);
+ iv.setSafe(1, 42);
+ iv.setNull(2);
+ root.setRowCount(3);
+ writer.writeBatch();
+
+ iv.setSafe(0, 10);
+ iv.setSafe(1, 20);
+ iv.setSafe(2, 30);
+ root.setRowCount(3);
+ writer.writeBatch();
+ }
+
+ ArrowStreamReader reader =
+ new ArrowStreamReader(new ByteArrayInputStream(baos.toByteArray()),
allocator);
+ try (var stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT $1 + 1 AS spam");
+ stmt.bind(reader);
+ try (final var result = stmt.executeQuery()) {
+ assertThat(ArrowToJava.toLongs(result.getReader(), "spam"))
+ .containsExactly(2L, 43L, null, 11L, 21L, 31L);
+ }
+ }
+ }
+
void runSetup(String... sql) throws Exception {
try (var stmt = conn.createStatement()) {
for (String s : sql) {
diff --git
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
index 57caaa51a..ae2ffc751 100644
---
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
+++
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
@@ -21,6 +21,8 @@ import static
org.apache.arrow.adbc.driver.testsuite.ArrowAssertions.assertSchem
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -50,6 +52,8 @@ import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -598,6 +602,43 @@ class SqlServerIntegrationTest {
}
}
+ @Test
+ void bindStream() throws Exception {
+ // Create temp Arrow file to get an ArrowReader
+ // Note that the field name doesn't really matter
+ Schema schema = new Schema(List.of(Field.nullable("i",
Types.MinorType.INT.getType())));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null, baos)) {
+ writer.start();
+
+ IntVector iv = (IntVector) root.getVector(0);
+
+ iv.setSafe(0, 1);
+ iv.setSafe(1, 42);
+ iv.setNull(2);
+ root.setRowCount(3);
+ writer.writeBatch();
+
+ iv.setSafe(0, 10);
+ iv.setSafe(1, 20);
+ iv.setSafe(2, 30);
+ root.setRowCount(3);
+ writer.writeBatch();
+ }
+
+ ArrowStreamReader reader =
+ new ArrowStreamReader(new ByteArrayInputStream(baos.toByteArray()),
allocator);
+ try (var stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT @p1 + 1 AS spam");
+ stmt.bind(reader);
+ try (final var result = stmt.executeQuery()) {
+ assertThat(ArrowToJava.toLongs(result.getReader(), "spam"))
+ .containsExactly(2L, 43L, null, 11L, 21L, 31L);
+ }
+ }
+ }
+
void runSetup(String... sql) throws Exception {
try (var stmt = conn.createStatement()) {
for (String s : sql) {
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
index dd4f581b2..d3bc16ebc 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
@@ -17,6 +17,7 @@
package org.apache.arrow.adbc.driver.jni;
+import java.io.IOException;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.TypedKey;
@@ -25,16 +26,21 @@ import
org.apache.arrow.adbc.driver.jni.impl.NativePartitionResult;
import org.apache.arrow.adbc.driver.jni.impl.NativeQueryResult;
import org.apache.arrow.adbc.driver.jni.impl.NativeStatementHandle;
import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.checkerframework.checker.nullness.qual.Nullable;
public class JniStatement implements AdbcStatement {
private final BufferAllocator allocator;
private final NativeStatementHandle handle;
- private VectorSchemaRoot bindRoot;
+ private @Nullable VectorSchemaRoot bindRoot;
+ private @Nullable ArrowReader bindStream;
public JniStatement(BufferAllocator allocator, NativeStatementHandle handle)
{
this.allocator = allocator;
@@ -53,23 +59,50 @@ public class JniStatement implements AdbcStatement {
@Override
public void bind(VectorSchemaRoot root) throws AdbcException {
+ clearBind();
this.bindRoot = root;
}
+ @Override
+ public void bind(ArrowReader reader) throws AdbcException {
+ clearBind();
+ this.bindRoot = null;
+ this.bindStream = reader;
+ }
+
+ private void clearBind() throws AdbcException {
+ if (this.bindStream != null) {
+ try {
+ this.bindStream.close();
+ } catch (IOException e) {
+ throw AdbcException.internal("[jni] failed to close previous bind
stream").withCause(e);
+ } finally {
+ this.bindStream = null;
+ }
+ }
+ this.bindRoot = null;
+ }
+
// The C Data export takes ownership of the data at bind time and ignores
subsequent
// client changes to the bound root. Defer the export until execution so we
capture
// the final state of the VectorSchemaRoot.
private void exportBind() throws AdbcException {
- if (bindRoot == null) {
- return;
- }
- try (final ArrowArray batch = ArrowArray.allocateNew(allocator);
- final ArrowSchema schema = ArrowSchema.allocateNew(allocator)) {
- // TODO(lidavidm): we may need a way to separately provide a dictionary
provider
- Data.exportSchema(allocator, bindRoot.getSchema(), null, schema);
- Data.exportVectorSchemaRoot(allocator, bindRoot, null, batch);
-
- JniLoader.INSTANCE.statementBind(handle, batch, schema);
+ if (bindRoot != null) {
+ try (final ArrowArray batch = ArrowArray.allocateNew(allocator);
+ final ArrowSchema schema = ArrowSchema.allocateNew(allocator)) {
+ // TODO(lidavidm): we may need a way to separately provide a
dictionary provider
+ Data.exportSchema(allocator, bindRoot.getSchema(), null, schema);
+ Data.exportVectorSchemaRoot(allocator, bindRoot, null, batch);
+
+ JniLoader.INSTANCE.statementBind(handle, batch, schema);
+ }
+ } else if (bindStream != null) {
+ try (final ArrowArrayStream stream =
ArrowArrayStream.allocateNew(allocator)) {
+ Data.exportArrayStream(allocator, bindStream, stream);
+ JniLoader.INSTANCE.statementBindStream(handle, stream);
+ // now owned by the native handle
+ bindStream = null;
+ }
}
}
@@ -111,8 +144,12 @@ public class JniStatement implements AdbcStatement {
}
@Override
- public void close() {
- handle.close();
+ public void close() throws AdbcException {
+ try {
+ AutoCloseables.close(handle, bindStream);
+ } catch (Exception e) {
+ throw AdbcException.internal("[jni] failed to close
statement").withCause(e);
+ }
}
@Override
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
index 9b8fbf916..500429a88 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
@@ -27,6 +27,7 @@ import java.nio.file.StandardCopyOption;
import java.util.Map;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.ArrowSchema;
/** Singleton wrapper protecting access to JNI functions. */
@@ -107,6 +108,11 @@ public enum JniLoader {
statement.getStatementHandle(), batch.memoryAddress(),
schema.memoryAddress());
}
+ public void statementBindStream(NativeStatementHandle statement,
ArrowArrayStream stream)
+ throws AdbcException {
+ NativeAdbc.statementBindStream(statement.getStatementHandle(),
stream.memoryAddress());
+ }
+
public long statementExecuteUpdate(NativeStatementHandle statement) throws
AdbcException {
return NativeAdbc.statementExecuteUpdate(statement.getStatementHandle());
}
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
index dff4cfde1..a7f2b4fbf 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
@@ -40,7 +40,6 @@ class NativeAdbc {
// TODO(lidavidm): we need a way to bind an ArrowReader (or some other
suitable interface that
// doesn't exist in arrow-java; see the discussion around the Avro reader
about how ArrowReader
// isn't a very general interface)
- @SuppressWarnings("unused")
static native void statementBindStream(long handle, long stream) throws
AdbcException;
static native void statementCancel(long handle) throws AdbcException;