This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 5d409fc39 feat(java/driver/jni): wire up ingest into
temporary/namespace (#4250)
5d409fc39 is described below
commit 5d409fc39c6d56fbedae31b9b32a52d44bcea926
Author: David Li <[email protected]>
AuthorDate: Mon Apr 27 10:38:56 2026 +0900
feat(java/driver/jni): wire up ingest into temporary/namespace (#4250)
Closes #4240.
---
.../org/apache/arrow/adbc/core/AdbcConnection.java | 12 +++
.../org/apache/arrow/adbc/core/IngestOption.java | 93 ++++++++++++++++++++++
.../adbc/driver/jni/PostgresIntegrationTest.java | 76 ++++++++++++++++++
.../adbc/driver/jni/SqlServerIntegrationTest.java | 76 ++++++++++++++++++
.../arrow/adbc/driver/jni/JniConnection.java | 30 +++++++
5 files changed, 287 insertions(+)
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
index 0b39a3d6a..ab650ec40 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
@@ -63,6 +63,18 @@ public interface AdbcConnection extends AutoCloseable,
AdbcOptions {
"Connection does not support bulkIngest(String, BulkIngestMode)");
}
+ /**
+ * Create a new statement to bulk insert a {@link VectorSchemaRoot} into a
table.
+ *
+ * <p>Bind data to the statement, then call {@link
AdbcStatement#executeUpdate()}. See {@link
+ * BulkIngestMode} for description of behavior around creating tables.
+ */
+ default AdbcStatement bulkIngest(
+ String targetTableName, BulkIngestMode mode, IngestOption... options)
throws AdbcException {
+ throw AdbcException.notImplemented(
+ "Connection does not support bulkIngest(String, BulkIngestMode,
IngestOption...)");
+ }
+
/**
* Create a result set from a serialized PartitionDescriptor.
*
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/IngestOption.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/IngestOption.java
new file mode 100644
index 000000000..6040f32dd
--- /dev/null
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/IngestOption.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.core;
+
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public interface IngestOption {
+ TemporaryIngestOption TEMPORARY = new TemporaryIngestOption(true);
+ TemporaryIngestOption NOT_TEMPORARY = new TemporaryIngestOption(false);
+
+ static IngestOption targetCatalog(String catalog) {
+ return new TargetNamespaceIngestOption(catalog, null);
+ }
+
+ static IngestOption targetDbSchema(String dbSchema) {
+ return new TargetNamespaceIngestOption(null, dbSchema);
+ }
+
+ static IngestOption targetNamespace(@Nullable String catalog, @Nullable
String dbSchema) {
+ return new TargetNamespaceIngestOption(catalog, dbSchema);
+ }
+
+ class TemporaryIngestOption implements IngestOption {
+ boolean temporary;
+
+ TemporaryIngestOption(boolean temporary) {
+ this.temporary = temporary;
+ }
+
+ public boolean isTemporary() {
+ return temporary;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ TemporaryIngestOption that = (TemporaryIngestOption) o;
+ return temporary == that.temporary;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(temporary);
+ }
+ }
+
+ class TargetNamespaceIngestOption implements IngestOption {
+ private final @Nullable String targetCatalog;
+ private final @Nullable String targetDbSchema;
+
+ public TargetNamespaceIngestOption(
+ @Nullable String targetCatalog, @Nullable String targetDbSchema) {
+ this.targetCatalog = targetCatalog;
+ this.targetDbSchema = targetDbSchema;
+ }
+
+ public @Nullable String getTargetCatalog() {
+ return targetCatalog;
+ }
+
+ public @Nullable String getTargetDbSchema() {
+ return targetDbSchema;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ TargetNamespaceIngestOption that = (TargetNamespaceIngestOption) o;
+ return Objects.equals(targetCatalog, that.targetCatalog)
+ && Objects.equals(targetDbSchema, that.targetDbSchema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(targetCatalog, targetDbSchema);
+ }
+ }
+}
diff --git
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
index 125644041..bfc715720 100644
---
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
+++
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/PostgresIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.arrow.adbc.core.AdbcOptions;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.AdbcStatusCode;
import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.IngestOption;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.testsuite.ArrowToJava;
import org.apache.arrow.memory.BufferAllocator;
@@ -272,6 +273,81 @@ class PostgresIntegrationTest {
}
}
+ @Test
+ void bulkIngestTarget() throws Exception {
+ runSetup(
+ "DROP TABLE IF EXISTS secondary.foobar",
+ "DROP SCHEMA IF EXISTS secondary",
+ "CREATE SCHEMA secondary");
+
+ final Schema schema =
+ new Schema(
+ List.of(
+ Field.nullable("ndx", Types.MinorType.INT.getType()),
+ Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+ try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+ IntVector iv = (IntVector) vsr.getVector(0);
+ VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+ try (AdbcStatement stmt =
+ conn.bulkIngest(
+ "foobar",
+ BulkIngestMode.REPLACE,
+ IngestOption.NOT_TEMPORARY,
+ IngestOption.targetNamespace(null, "secondary"))) {
+ iv.setSafe(0, 1);
+ iv.setSafe(1, 2);
+ vv.setNull(0);
+ vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+ vsr.setRowCount(2);
+
+ stmt.bind(vsr);
+ assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+ }
+ }
+
+ try (AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT value FROM secondary.foobar ORDER BY ndx");
+ try (var result = stmt.executeQuery()) {
+ var values = ArrowToJava.toStrings(result.getReader(), "value");
+ assertThat(values).containsExactly(null, "foobar");
+ }
+ }
+ }
+
+ @Test
+ void bulkIngestTemporary() throws Exception {
+ final Schema schema =
+ new Schema(
+ List.of(
+ Field.nullable("ndx", Types.MinorType.INT.getType()),
+ Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+ try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+ IntVector iv = (IntVector) vsr.getVector(0);
+ VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+ try (AdbcStatement stmt =
+ conn.bulkIngest("foobar", BulkIngestMode.CREATE,
IngestOption.TEMPORARY)) {
+ iv.setSafe(0, 1);
+ iv.setSafe(1, 2);
+ vv.setNull(0);
+ vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+ vsr.setRowCount(2);
+
+ stmt.bind(vsr);
+ assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+ }
+ }
+
+ try (AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT value FROM foobar ORDER BY ndx");
+ try (var result = stmt.executeQuery()) {
+ var values = ArrowToJava.toStrings(result.getReader(), "value");
+ assertThat(values).containsExactly(null, "foobar");
+ }
+ }
+ }
+
@Test
void currentCatalogSchema() throws Exception {
runSetup(
diff --git
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
index 0b4bfd452..2355484bd 100644
---
a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
+++
b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/SqlServerIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.arrow.adbc.core.AdbcOptions;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.AdbcStatusCode;
import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.IngestOption;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.testsuite.ArrowToJava;
import org.apache.arrow.memory.BufferAllocator;
@@ -274,6 +275,81 @@ class SqlServerIntegrationTest {
}
}
+ @Test
+ void bulkIngestTarget() throws Exception {
+ runSetup(
+ "DROP TABLE IF EXISTS secondary.foobar",
+ "DROP SCHEMA IF EXISTS secondary",
+ "CREATE SCHEMA secondary");
+
+ final Schema schema =
+ new Schema(
+ List.of(
+ Field.nullable("ndx", Types.MinorType.INT.getType()),
+ Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+ try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+ IntVector iv = (IntVector) vsr.getVector(0);
+ VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+ try (AdbcStatement stmt =
+ conn.bulkIngest(
+ "foobar",
+ BulkIngestMode.REPLACE,
+ IngestOption.NOT_TEMPORARY,
+ IngestOption.targetNamespace("master", "secondary"))) {
+ iv.setSafe(0, 1);
+ iv.setSafe(1, 2);
+ vv.setNull(0);
+ vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+ vsr.setRowCount(2);
+
+ stmt.bind(vsr);
+ assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+ }
+ }
+
+ try (AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT value FROM secondary.foobar ORDER BY ndx");
+ try (var result = stmt.executeQuery()) {
+ var values = ArrowToJava.toStrings(result.getReader(), "value");
+ assertThat(values).containsExactly(null, "foobar");
+ }
+ }
+ }
+
+ @Test
+ void bulkIngestTemporary() throws Exception {
+ final Schema schema =
+ new Schema(
+ List.of(
+ Field.nullable("ndx", Types.MinorType.INT.getType()),
+ Field.nullable("value", Types.MinorType.VARCHAR.getType())));
+ try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator)) {
+ IntVector iv = (IntVector) vsr.getVector(0);
+ VarCharVector vv = (VarCharVector) vsr.getVector(1);
+
+ try (AdbcStatement stmt =
+ conn.bulkIngest("foobar", BulkIngestMode.CREATE,
IngestOption.TEMPORARY)) {
+ iv.setSafe(0, 1);
+ iv.setSafe(1, 2);
+ vv.setNull(0);
+ vv.setSafe(1, "foobar".getBytes(StandardCharsets.UTF_8));
+ vsr.setRowCount(2);
+
+ stmt.bind(vsr);
+ assertThat(stmt.executeUpdate().getAffectedRows()).isEqualTo(2);
+ }
+ }
+
+ try (AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT value FROM #foobar ORDER BY ndx");
+ try (var result = stmt.executeQuery()) {
+ var values = ArrowToJava.toStrings(result.getReader(), "value");
+ assertThat(values).containsExactly(null, "foobar");
+ }
+ }
+ }
+
@Test
void currentCatalogSchema() throws Exception {
runSetup(
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
index ebfd75957..5a121ebb9 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
@@ -21,6 +21,7 @@ import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.IngestOption;
import org.apache.arrow.adbc.core.IsolationLevel;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
@@ -53,6 +54,17 @@ public class JniConnection implements AdbcConnection {
@Override
public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
throws AdbcException {
+ return bulkIngestImpl(targetTableName, mode);
+ }
+
+ @Override
+ public AdbcStatement bulkIngest(
+ String targetTableName, BulkIngestMode mode, IngestOption... options)
throws AdbcException {
+ return bulkIngestImpl(targetTableName, mode, options);
+ }
+
+ AdbcStatement bulkIngestImpl(String targetTableName, BulkIngestMode mode,
IngestOption... options)
+ throws AdbcException {
NativeStatementHandle stmtHandle =
JniLoader.INSTANCE.openStatement(handle);
try {
String modeValue;
@@ -77,6 +89,24 @@ public class JniConnection implements AdbcConnection {
stmtHandle, "adbc.ingest.target_table", targetTableName);
JniLoader.INSTANCE.statementSetOptionString(stmtHandle,
"adbc.ingest.mode", modeValue);
+ for (var option : options) {
+ if (option instanceof IngestOption.TemporaryIngestOption) {
+ var o = (IngestOption.TemporaryIngestOption) option;
+ JniLoader.INSTANCE.statementSetOptionString(
+ stmtHandle, "adbc.ingest.temporary",
Boolean.toString(o.isTemporary()));
+ } else if (option instanceof IngestOption.TargetNamespaceIngestOption)
{
+ var o = (IngestOption.TargetNamespaceIngestOption) option;
+ if (o.getTargetCatalog() != null) {
+ JniLoader.INSTANCE.statementSetOptionString(
+ stmtHandle, "adbc.ingest.target_catalog",
o.getTargetCatalog());
+ }
+ if (o.getTargetDbSchema() != null) {
+ JniLoader.INSTANCE.statementSetOptionString(
+ stmtHandle, "adbc.ingest.target_db_schema",
o.getTargetDbSchema());
+ }
+ }
+ }
+
return new JniStatement(allocator, stmtHandle);
} catch (Exception e) {
stmtHandle.close();