This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c5812809 feat(java/driver/jni): implement binding of streams (#4359)
4c5812809 is described below

commit 4c5812809d669e6d74742686e95f2ac4af17bb5d
Author: David Li <[email protected]>
AuthorDate: Sun Jun 7 15:04:38 2026 -0700

    feat(java/driver/jni): implement binding of streams (#4359)
    
    Depends on #4357.
    Closes #4358.
---
 .../org/apache/arrow/adbc/core/AdbcException.java  |  5 ++
 .../org/apache/arrow/adbc/core/AdbcStatement.java  | 15 +++++-
 .../adbc/driver/jni/PostgresIntegrationTest.java   | 41 ++++++++++++++
 .../adbc/driver/jni/SqlServerIntegrationTest.java  | 41 ++++++++++++++
 .../apache/arrow/adbc/driver/jni/JniStatement.java | 63 +++++++++++++++++-----
 .../arrow/adbc/driver/jni/impl/JniLoader.java      |  6 +++
 .../arrow/adbc/driver/jni/impl/NativeAdbc.java     |  1 -
 7 files changed, 157 insertions(+), 15 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java 
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
index 104afe578..bf6b66d43 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
@@ -72,6 +72,11 @@ public class AdbcException extends Exception {
     return new AdbcException(message, /*cause*/ null, AdbcStatusCode.IO, null, 
0);
   }
 
+  /** Create a new exception with code {@link AdbcStatusCode#INTERNAL}. */
+  public static AdbcException internal(String message) {
+    return new AdbcException(message, /*cause*/ null, AdbcStatusCode.INTERNAL, 
null, 0);
+  }
+
   /** Create a new exception with code {@link AdbcStatusCode#INVALID_STATE}. */
   public static AdbcException invalidState(String message) {
     return new AdbcException(message, /*cause*/ null, 
AdbcStatusCode.INVALID_STATE, null, 0);
diff --git 
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java 
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
index bbbae4470..ba7333684 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
@@ -83,11 +83,24 @@ public interface AdbcStatement extends AutoCloseable, 
AdbcOptions {
     throw AdbcException.notImplemented("Statement does not support Substrait 
plans");
   }
 
-  /** Bind this statement to a VectorSchemaRoot to provide parameter 
values/bulk data ingestion. */
+  /**
+   * Bind a VectorSchemaRoot to provide parameter values/bulk data ingestion.
+   *
+   * <p>The statement will NOT close the root after use/on close/when other 
data is bound.
+   */
   default void bind(VectorSchemaRoot root) throws AdbcException {
     throw AdbcException.notImplemented("Statement does not support bind");
   }
 
+  /**
+   * Bind an ArrowReader to provide parameter values/bulk data ingestion.
+   *
+   * <p>The statement will close the reader after use/on close/when other data 
is bound.
+   */
+  default void bind(ArrowReader reader) throws AdbcException {
+    throw AdbcException.notImplemented("Statement does not support 
bindStream");
+  }
+
   /**
    * Execute a result set-generating query and get the result.
    *
diff --git 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
index 2cc9cef81..facf99507 100644
--- 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
+++ 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
@@ -21,6 +21,8 @@ import static 
org.apache.arrow.adbc.driver.testsuite.ArrowAssertions.assertSchem
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,6 +51,8 @@ import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -630,6 +634,43 @@ class PostgresIntegrationTest {
     }
   }
 
+  @Test
+  void bindStream() throws Exception {
+    // Create temp Arrow file to get an ArrowReader
+    // Note that the field name doesn't really matter
+    Schema schema = new Schema(List.of(Field.nullable("i", 
Types.MinorType.INT.getType())));
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+        ArrowStreamWriter writer = new ArrowStreamWriter(root, null, baos)) {
+      writer.start();
+
+      IntVector iv = (IntVector) root.getVector(0);
+
+      iv.setSafe(0, 1);
+      iv.setSafe(1, 42);
+      iv.setNull(2);
+      root.setRowCount(3);
+      writer.writeBatch();
+
+      iv.setSafe(0, 10);
+      iv.setSafe(1, 20);
+      iv.setSafe(2, 30);
+      root.setRowCount(3);
+      writer.writeBatch();
+    }
+
+    ArrowStreamReader reader =
+        new ArrowStreamReader(new ByteArrayInputStream(baos.toByteArray()), 
allocator);
+    try (var stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT $1 + 1 AS spam");
+      stmt.bind(reader);
+      try (final var result = stmt.executeQuery()) {
+        assertThat(ArrowToJava.toLongs(result.getReader(), "spam"))
+            .containsExactly(2L, 43L, null, 11L, 21L, 31L);
+      }
+    }
+  }
+
   void runSetup(String... sql) throws Exception {
     try (var stmt = conn.createStatement()) {
       for (String s : sql) {
diff --git 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
index 57caaa51a..ae2ffc751 100644
--- 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
+++ 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
@@ -21,6 +21,8 @@ import static 
org.apache.arrow.adbc.driver.testsuite.ArrowAssertions.assertSchem
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -50,6 +52,8 @@ import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -598,6 +602,43 @@ class SqlServerIntegrationTest {
     }
   }
 
+  @Test
+  void bindStream() throws Exception {
+    // Create temp Arrow file to get an ArrowReader
+    // Note that the field name doesn't really matter
+    Schema schema = new Schema(List.of(Field.nullable("i", 
Types.MinorType.INT.getType())));
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+        ArrowStreamWriter writer = new ArrowStreamWriter(root, null, baos)) {
+      writer.start();
+
+      IntVector iv = (IntVector) root.getVector(0);
+
+      iv.setSafe(0, 1);
+      iv.setSafe(1, 42);
+      iv.setNull(2);
+      root.setRowCount(3);
+      writer.writeBatch();
+
+      iv.setSafe(0, 10);
+      iv.setSafe(1, 20);
+      iv.setSafe(2, 30);
+      root.setRowCount(3);
+      writer.writeBatch();
+    }
+
+    ArrowStreamReader reader =
+        new ArrowStreamReader(new ByteArrayInputStream(baos.toByteArray()), 
allocator);
+    try (var stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT @p1 + 1 AS spam");
+      stmt.bind(reader);
+      try (final var result = stmt.executeQuery()) {
+        assertThat(ArrowToJava.toLongs(result.getReader(), "spam"))
+            .containsExactly(2L, 43L, null, 11L, 21L, 31L);
+      }
+    }
+  }
+
   void runSetup(String... sql) throws Exception {
     try (var stmt = conn.createStatement()) {
       for (String s : sql) {
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
index dd4f581b2..d3bc16ebc 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
@@ -17,6 +17,7 @@
 
 package org.apache.arrow.adbc.driver.jni;
 
+import java.io.IOException;
 import org.apache.arrow.adbc.core.AdbcException;
 import org.apache.arrow.adbc.core.AdbcStatement;
 import org.apache.arrow.adbc.core.TypedKey;
@@ -25,16 +26,21 @@ import 
org.apache.arrow.adbc.driver.jni.impl.NativePartitionResult;
 import org.apache.arrow.adbc.driver.jni.impl.NativeQueryResult;
 import org.apache.arrow.adbc.driver.jni.impl.NativeStatementHandle;
 import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowArrayStream;
 import org.apache.arrow.c.ArrowSchema;
 import org.apache.arrow.c.Data;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 public class JniStatement implements AdbcStatement {
   private final BufferAllocator allocator;
   private final NativeStatementHandle handle;
-  private VectorSchemaRoot bindRoot;
+  private @Nullable VectorSchemaRoot bindRoot;
+  private @Nullable ArrowReader bindStream;
 
   public JniStatement(BufferAllocator allocator, NativeStatementHandle handle) 
{
     this.allocator = allocator;
@@ -53,23 +59,50 @@ public class JniStatement implements AdbcStatement {
 
   @Override
   public void bind(VectorSchemaRoot root) throws AdbcException {
+    clearBind();
     this.bindRoot = root;
   }
 
+  @Override
+  public void bind(ArrowReader reader) throws AdbcException {
+    clearBind();
+    this.bindRoot = null;
+    this.bindStream = reader;
+  }
+
+  private void clearBind() throws AdbcException {
+    if (this.bindStream != null) {
+      try {
+        this.bindStream.close();
+      } catch (IOException e) {
+        throw AdbcException.internal("[jni] failed to close previous bind 
stream").withCause(e);
+      } finally {
+        this.bindStream = null;
+      }
+    }
+    this.bindRoot = null;
+  }
+
   // The C Data export takes ownership of the data at bind time and ignores 
subsequent
   // client changes to the bound root. Defer the export until execution so we 
capture
   // the final state of the VectorSchemaRoot.
   private void exportBind() throws AdbcException {
-    if (bindRoot == null) {
-      return;
-    }
-    try (final ArrowArray batch = ArrowArray.allocateNew(allocator);
-        final ArrowSchema schema = ArrowSchema.allocateNew(allocator)) {
-      // TODO(lidavidm): we may need a way to separately provide a dictionary 
provider
-      Data.exportSchema(allocator, bindRoot.getSchema(), null, schema);
-      Data.exportVectorSchemaRoot(allocator, bindRoot, null, batch);
-
-      JniLoader.INSTANCE.statementBind(handle, batch, schema);
+    if (bindRoot != null) {
+      try (final ArrowArray batch = ArrowArray.allocateNew(allocator);
+          final ArrowSchema schema = ArrowSchema.allocateNew(allocator)) {
+        // TODO(lidavidm): we may need a way to separately provide a 
dictionary provider
+        Data.exportSchema(allocator, bindRoot.getSchema(), null, schema);
+        Data.exportVectorSchemaRoot(allocator, bindRoot, null, batch);
+
+        JniLoader.INSTANCE.statementBind(handle, batch, schema);
+      }
+    } else if (bindStream != null) {
+      try (final ArrowArrayStream stream = 
ArrowArrayStream.allocateNew(allocator)) {
+        Data.exportArrayStream(allocator, bindStream, stream);
+        JniLoader.INSTANCE.statementBindStream(handle, stream);
+        // now owned by the native handle
+        bindStream = null;
+      }
     }
   }
 
@@ -111,8 +144,12 @@ public class JniStatement implements AdbcStatement {
   }
 
   @Override
-  public void close() {
-    handle.close();
+  public void close() throws AdbcException {
+    try {
+      AutoCloseables.close(handle, bindStream);
+    } catch (Exception e) {
+      throw AdbcException.internal("[jni] failed to close 
statement").withCause(e);
+    }
   }
 
   @Override
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
index 9b8fbf916..500429a88 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
@@ -27,6 +27,7 @@ import java.nio.file.StandardCopyOption;
 import java.util.Map;
 import org.apache.arrow.adbc.core.AdbcException;
 import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowArrayStream;
 import org.apache.arrow.c.ArrowSchema;
 
 /** Singleton wrapper protecting access to JNI functions. */
@@ -107,6 +108,11 @@ public enum JniLoader {
         statement.getStatementHandle(), batch.memoryAddress(), 
schema.memoryAddress());
   }
 
+  public void statementBindStream(NativeStatementHandle statement, 
ArrowArrayStream stream)
+      throws AdbcException {
+    NativeAdbc.statementBindStream(statement.getStatementHandle(), 
stream.memoryAddress());
+  }
+
   public long statementExecuteUpdate(NativeStatementHandle statement) throws 
AdbcException {
     return NativeAdbc.statementExecuteUpdate(statement.getStatementHandle());
   }
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
index dff4cfde1..a7f2b4fbf 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
@@ -40,7 +40,6 @@ class NativeAdbc {
   // TODO(lidavidm): we need a way to bind an ArrowReader (or some other 
suitable interface that
   // doesn't exist in arrow-java; see the discussion around the Avro reader 
about how ArrowReader
   // isn't a very general interface)
-  @SuppressWarnings("unused")
   static native void statementBindStream(long handle, long stream) throws 
AdbcException;
 
   static native void statementCancel(long handle) throws AdbcException;

Reply via email to