Hi all,
I'd like to propose adding partitioned bulk ingest to ADBC — the write-side
mirror of ExecutePartitions/ReadPartition.
Today ADBC's bulk ingest is single-connection, single-transaction. There
are at least two workloads that don't fit:
1. Distributed writers (Spark/Flink) targeting an RDBMS.
N executors each open their own connection and run independent ingest
calls, but there is no consistency control across them. There is no atomic
commit point where all partitions become visible together, and no rollback
if one executor fails. Workarounds (per-job staging tables, ad-hoc swap
SQL) are database-specific and leak into application code.
2. Distributed writers targeting table-format catalogs (Iceberg, Delta).
These formats are designed for parallel writes followed by a single atomic
commit, but ADBC has no way to expose that shape. A driver author who wants
to write to Iceberg today has to either route all data through one process
(defeats the purpose) or invent a private API.
Both workloads need the same shape: coordinator decides what to ingest,
workers write partitions in parallel, coordinator commits or aborts
atomically.
Proposed API
------------
Four new operations on AdbcConnection:
coordinator: BeginIngestPartitions(conn, table, mode, schema) -> handle
workers: WriteIngestPartition(conn, handle, stream) -> receipt
WriteIngestPartition(conn, handle, stream) -> receipt
...
coordinator: CommitIngestPartitions(conn, handle, receipts) ->
rows_affected
(or) AbortIngestPartitions(conn, handle, receipts)
Handle and receipts are opaque, serializable byte strings.
Workers may use different connections (and different hosts) than the
coordinator.
This is the same shape as the existing partitioned-read side, where
ExecutePartitions returns opaque partition tokens that can be shipped to
workers and passed to ReadPartition over a different connection.
C API surface
-------------
Two new structs for driver-owned output:
struct AdbcIngestHandle {
size_t length;
const uint8_t* bytes;
void* private_data;
void (*release)(struct AdbcIngestHandle*);
};
struct AdbcIngestReceipt {
size_t length;
const uint8_t* bytes;
void* private_data;
void (*release)(struct AdbcIngestReceipt*);
};
Four new functions:
AdbcConnectionBeginIngestPartitions(
conn, target_catalog, target_db_schema, target_table, mode,
schema, *out_handle, *error);
AdbcConnectionWriteIngestPartition(
conn, handle_bytes, handle_len, *data_stream,
*out_receipt, *error);
AdbcConnectionCommitIngestPartitions(
conn, handle_bytes, handle_len, num_receipts, receipts,
receipt_lens, *rows_affected, *error);
AdbcConnectionAbortIngestPartitions(
conn, handle_bytes, handle_len, num_receipts, receipts,
receipt_lens, *error);
Driver-side semantics
---------------------
Begin validates options, performs whatever setup the driver requires for
writes to proceed (e.g., creating the target table, reserving a transaction
snapshot, allocating an object-store prefix), and returns a handle encoding
the state needed to scope subsequent writes.
Write takes a handle and a stream, writes the partition into driver-private
staging (a per-write staging table, a per-write object-store path), and
returns a receipt encoding what was written (staging name, file paths, row
count, statistics).
Each Write call must produce output that can be committed or discarded
independently — no shared state across concurrent writes that would cause
duplicate rows on retry.
Commit atomically promotes the union of the supplied receipts into the
target.
- For RDBMS drivers this means swapping staging into target in a
transaction.
- For table-format drivers this means writing a catalog or
transaction-log entry referencing the data files in the receipts.
After successful commit the handle is consumed.
Abort discards all writes scoped to the handle. The driver must clean up
every write under the handle, not just the ones named in the supplied
receipts — this handles the case where a worker wrote data but its receipt
was lost in transit.
Why dedicated functions rather than options?
--------------------------------------------
Begin and Write are non-idempotent — they create staging resources (tables,
files) as side effects.
Returning results through GetOptionBytes requires two-phase sizing (call
once for length, allocate, call again for bytes), which is only safe for
idempotent operations.
If the buffer is too small on an operation that already mutated state, you
have orphaned resources with no handle to clean them up.
Driver-owned output structs with release callbacks eliminate this orphan
window.
That pattern requires dedicated function signatures — it can't be expressed
through GetOptionBytes.
This is the same reason the read side uses a dedicated
AdbcStatementExecutePartitions returning an AdbcPartitions struct rather
than encoding partition tokens through options.
Additionally, even if parameters were encoded as options, you would still
need trigger functions for Begin/Commit/Abort.
Statement's Execute could serve for one of these, but not all three cleanly.
Draft PR
--------
https://github.com/apache/arrow-adbc/pull/4317
The full spec document is at:
https://github.com/apache/arrow-adbc/blob/partitioned-ingest/docs/source/format/partitioned_bulk_ingest.rst
The PR includes a full spec document, C API additions (adbc.h), driver
manager wiring, and a reference implementation in the PostgreSQL driver
with test coverage.
Feedback welcome.
Thanks,
Tornike