This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d409fc39 feat(java/driver/jni): wire up ingest into 
temporary/namespace (#4250)
5d409fc39 is described below

commit 5d409fc39c6d56fbedae31b9b32a52d44bcea926
Author: David Li <[email protected]>
AuthorDate: Mon Apr 27 10:38:56 2026 +0900

    feat(java/driver/jni): wire up ingest into temporary/namespace (#4250)
    
    Closes #4240.
---
 .../org/apache/arrow/adbc/core/AdbcConnection.java | 12 +++
 .../org/apache/arrow/adbc/core/IngestOption.java   | 93 ++++++++++++++++++++++
 .../adbc/driver/jni/PostgresIntegrationTest.java   | 76 ++++++++++++++++++
 .../adbc/driver/jni/SqlServerIntegrationTest.java  | 76 ++++++++++++++++++
 .../arrow/adbc/driver/jni/JniConnection.java       | 30 +++++++
 5 files changed, 287 insertions(+)

diff --git 
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java 
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
index 0b39a3d6a..ab650ec40 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
@@ -63,6 +63,18 @@ public interface AdbcConnection extends AutoCloseable, 
AdbcOptions {
         "Connection does not support bulkIngest(String, BulkIngestMode)");
   }
 
+  /**
+   * Create a new statement to bulk insert a {@link VectorSchemaRoot} into a 
table.
+   *
+   * <p>Bind data to the statement, then call {@link 
AdbcStatement#executeUpdate()}. See {@link
+   * BulkIngestMode} for description of behavior around creating tables.
+   */
+  default AdbcStatement bulkIngest(
+      String targetTableName, BulkIngestMode mode, IngestOption... options) 
throws AdbcException {
+    throw AdbcException.notImplemented(
+        "Connection does not support bulkIngest(String, BulkIngestMode, 
IngestOption...)");
+  }
+
   /**
    * Create a result set from a serialized PartitionDescriptor.
    *
diff --git 
a/java/core/src/main/java/org/apache/arrow/adbc/core/IngestOption.java 
b/java/core/src/main/java/org/apache/arrow/adbc/core/IngestOption.java
new file mode 100644
index 000000000..6040f32dd
--- /dev/null
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/IngestOption.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.core;
+
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public interface IngestOption {
+  TemporaryIngestOption TEMPORARY = new TemporaryIngestOption(true);
+  TemporaryIngestOption NOT_TEMPORARY = new TemporaryIngestOption(false);
+
+  static IngestOption targetCatalog(String catalog) {
+    return new TargetNamespaceIngestOption(catalog, null);
+  }
+
+  static IngestOption targetDbSchema(String dbSchema) {
+    return new TargetNamespaceIngestOption(null, dbSchema);
+  }
+
+  static IngestOption targetNamespace(@Nullable String catalog, @Nullable 
String dbSchema) {
+    return new TargetNamespaceIngestOption(catalog, dbSchema);
+  }
+
+  class TemporaryIngestOption implements IngestOption {
+    boolean temporary;
+
+    TemporaryIngestOption(boolean temporary) {
+      this.temporary = temporary;
+    }
+
+    public boolean isTemporary() {
+      return temporary;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (o == null || getClass() != o.getClass()) return false;
+      TemporaryIngestOption that = (TemporaryIngestOption) o;
+      return temporary == that.temporary;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(temporary);
+    }
+  }
+
+  class TargetNamespaceIngestOption implements IngestOption {
+    private final @Nullable String targetCatalog;
+    private final @Nullable String targetDbSchema;
+
+    public TargetNamespaceIngestOption(
+        @Nullable String targetCatalog, @Nullable String targetDbSchema) {
+      this.targetCatalog = targetCatalog;
+      this.targetDbSchema = targetDbSchema;
+    }
+
+    public @Nullable String getTargetCatalog() {
+      return targetCatalog;
+    }
+
+    public @Nullable String getTargetDbSchema() {
+      return targetDbSchema;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (o == null || getClass() != o.getClass()) return false;
+      TargetNamespaceIngestOption that = (TargetNamespaceIngestOption) o;
+      return Objects.equals(targetCatalog, that.targetCatalog)
+          && Objects.equals(targetDbSchema, that.targetDbSchema);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(targetCatalog, targetDbSchema);
+    }
+  }
+}
diff --git 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
index 125644041..bfc715720 100644
--- 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
+++ 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.arrow.adbc.core.AdbcOptions;
 import org.apache.arrow.adbc.core.AdbcStatement;
 import org.apache.arrow.adbc.core.AdbcStatusCode;
 import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.IngestOption;
 import org.apache.arrow.adbc.core.TypedKey;
 import org.apache.arrow.adbc.driver.testsuite.ArrowToJava;
 import org.apache.arrow.memory.BufferAllocator;
@@ -272,6 +273,81 @@ class PostgresIntegrationTest {
     }
   }
 
+  @Test
+  void bulkIngestTarget() throws Exception {
+    runSetup(
+        "DROP TABLE IF EXISTS secondary.foobar",
+        "DROP SCHEMA IF EXISTS secondary",
+        "CREATE SCHEMA secondary");
+
+    final Schema schema =
+        new Schema(
+            List.of(
+                Field.nullable("ndx", Types.MinorType.INT.getType()),
+                Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+    try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) vsr.getVector(0);
+      VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+      try (AdbcStatement stmt =
+          conn.bulkIngest(
+              "foobar",
+              BulkIngestMode.REPLACE,
+              IngestOption.NOT_TEMPORARY,
+              IngestOption.targetNamespace(null, "secondary"))) {
+        iv.setSafe(0, 1);
+        iv.setSafe(1, 2);
+        vv.setNull(0);
+        vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+        vsr.setRowCount(2);
+
+        stmt.bind(vsr);
+        assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+      }
+    }
+
+    try (AdbcStatement stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT value FROM secondary.foobar ORDER BY ndx");
+      try (var result = stmt.executeQuery()) {
+        var values = ArrowToJava.toStrings(result.getReader(), "value");
+        assertThat(values).containsExactly(null, "foobar");
+      }
+    }
+  }
+
+  @Test
+  void bulkIngestTemporary() throws Exception {
+    final Schema schema =
+        new Schema(
+            List.of(
+                Field.nullable("ndx", Types.MinorType.INT.getType()),
+                Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+    try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) vsr.getVector(0);
+      VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+      try (AdbcStatement stmt =
+          conn.bulkIngest("foobar", BulkIngestMode.CREATE, 
IngestOption.TEMPORARY)) {
+        iv.setSafe(0, 1);
+        iv.setSafe(1, 2);
+        vv.setNull(0);
+        vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+        vsr.setRowCount(2);
+
+        stmt.bind(vsr);
+        assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+      }
+    }
+
+    try (AdbcStatement stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT value FROM foobar ORDER BY ndx");
+      try (var result = stmt.executeQuery()) {
+        var values = ArrowToJava.toStrings(result.getReader(), "value");
+        assertThat(values).containsExactly(null, "foobar");
+      }
+    }
+  }
+
   @Test
   void currentCatalogSchema() throws Exception {
     runSetup(
diff --git 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
index 0b4bfd452..2355484bd 100644
--- 
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
+++ 
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.arrow.adbc.core.AdbcOptions;
 import org.apache.arrow.adbc.core.AdbcStatement;
 import org.apache.arrow.adbc.core.AdbcStatusCode;
 import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.IngestOption;
 import org.apache.arrow.adbc.core.TypedKey;
 import org.apache.arrow.adbc.driver.testsuite.ArrowToJava;
 import org.apache.arrow.memory.BufferAllocator;
@@ -274,6 +275,81 @@ class SqlServerIntegrationTest {
     }
   }
 
+  @Test
+  void bulkIngestTarget() throws Exception {
+    runSetup(
+        "DROP TABLE IF EXISTS secondary.foobar",
+        "DROP SCHEMA IF EXISTS secondary",
+        "CREATE SCHEMA secondary");
+
+    final Schema schema =
+        new Schema(
+            List.of(
+                Field.nullable("ndx", Types.MinorType.INT.getType()),
+                Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+    try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) vsr.getVector(0);
+      VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+      try (AdbcStatement stmt =
+          conn.bulkIngest(
+              "foobar",
+              BulkIngestMode.REPLACE,
+              IngestOption.NOT_TEMPORARY,
+              IngestOption.targetNamespace("master", "secondary"))) {
+        iv.setSafe(0, 1);
+        iv.setSafe(1, 2);
+        vv.setNull(0);
+        vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+        vsr.setRowCount(2);
+
+        stmt.bind(vsr);
+        assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+      }
+    }
+
+    try (AdbcStatement stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT value FROM secondary.foobar ORDER BY ndx");
+      try (var result = stmt.executeQuery()) {
+        var values = ArrowToJava.toStrings(result.getReader(), "value");
+        assertThat(values).containsExactly(null, "foobar");
+      }
+    }
+  }
+
+  @Test
+  void bulkIngestTemporary() throws Exception {
+    final Schema schema =
+        new Schema(
+            List.of(
+                Field.nullable("ndx", Types.MinorType.INT.getType()),
+                Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+    try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) vsr.getVector(0);
+      VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+      try (AdbcStatement stmt =
+          conn.bulkIngest("foobar", BulkIngestMode.CREATE, 
IngestOption.TEMPORARY)) {
+        iv.setSafe(0, 1);
+        iv.setSafe(1, 2);
+        vv.setNull(0);
+        vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+        vsr.setRowCount(2);
+
+        stmt.bind(vsr);
+        assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+      }
+    }
+
+    try (AdbcStatement stmt = conn.createStatement()) {
+      stmt.setSqlQuery("SELECT value FROM #foobar ORDER BY ndx");
+      try (var result = stmt.executeQuery()) {
+        var values = ArrowToJava.toStrings(result.getReader(), "value");
+        assertThat(values).containsExactly(null, "foobar");
+      }
+    }
+  }
+
   @Test
   void currentCatalogSchema() throws Exception {
     runSetup(
diff --git 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
index ebfd75957..5a121ebb9 100644
--- 
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
+++ 
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
@@ -21,6 +21,7 @@ import org.apache.arrow.adbc.core.AdbcConnection;
 import org.apache.arrow.adbc.core.AdbcException;
 import org.apache.arrow.adbc.core.AdbcStatement;
 import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.IngestOption;
 import org.apache.arrow.adbc.core.IsolationLevel;
 import org.apache.arrow.adbc.core.TypedKey;
 import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
@@ -53,6 +54,17 @@ public class JniConnection implements AdbcConnection {
   @Override
   public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
       throws AdbcException {
+    return bulkIngestImpl(targetTableName, mode);
+  }
+
+  @Override
+  public AdbcStatement bulkIngest(
+      String targetTableName, BulkIngestMode mode, IngestOption... options) 
throws AdbcException {
+    return bulkIngestImpl(targetTableName, mode, options);
+  }
+
+  AdbcStatement bulkIngestImpl(String targetTableName, BulkIngestMode mode, 
IngestOption... options)
+      throws AdbcException {
     NativeStatementHandle stmtHandle = 
JniLoader.INSTANCE.openStatement(handle);
     try {
       String modeValue;
@@ -77,6 +89,24 @@ public class JniConnection implements AdbcConnection {
           stmtHandle, "adbc.ingest.target_table", targetTableName);
       JniLoader.INSTANCE.statementSetOptionString(stmtHandle, 
"adbc.ingest.mode", modeValue);
 
+      for (var option : options) {
+        if (option instanceof IngestOption.TemporaryIngestOption) {
+          var o = (IngestOption.TemporaryIngestOption) option;
+          JniLoader.INSTANCE.statementSetOptionString(
+              stmtHandle, "adbc.ingest.temporary", 
Boolean.toString(o.isTemporary()));
+        } else if (option instanceof IngestOption.TargetNamespaceIngestOption) 
{
+          var o = (IngestOption.TargetNamespaceIngestOption) option;
+          if (o.getTargetCatalog() != null) {
+            JniLoader.INSTANCE.statementSetOptionString(
+                stmtHandle, "adbc.ingest.target_catalog", 
o.getTargetCatalog());
+          }
+          if (o.getTargetDbSchema() != null) {
+            JniLoader.INSTANCE.statementSetOptionString(
+                stmtHandle, "adbc.ingest.target_db_schema", 
o.getTargetDbSchema());
+          }
+        }
+      }
+
       return new JniStatement(allocator, stmtHandle);
     } catch (Exception e) {
       stmtHandle.close();

Reply via email to