jerryshao commented on code in PR #11152: URL: https://github.com/apache/gravitino/pull/11152#discussion_r3287403826
########## design-docs/async-iceberg-rest-hard-deletion.md: ########## @@ -0,0 +1,436 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Design: Asynchronous Hard Deletion for the Gravitino Iceberg REST Server + +| Field | Value | +| -------- | ------------------------------------------------------- | +| Status | Draft | +| Authors | @roryqi | +| Created | 2026-05-19 | +| Module | `iceberg/iceberg-rest-server`, `iceberg/iceberg-common` | + +--- + +## 1. Background + +When a client issues: + +``` +DELETE /v1/{prefix}/namespaces/{namespace}/tables/{table}?purgeRequested=true +``` + +today's path is fully synchronous: `IcebergTableOperations.dropTable` → +`IcebergTableOperationExecutor.dropTable` → `IcebergCatalogWrapper.purgeTable` +→ `CatalogHandlers.purgeTable`, which walks every snapshot / manifest / +data file and deletes each one through `FileIO` on the Jetty request +thread. + +For production tables this fails in three ways: + +- Multi-minute purges exceed HTTP timeouts. +- Concurrent purges saturate the Jetty pool. +- Mid-purge failures leak files with no retry or audit trail. + +We want the drop to return quickly, finish file deletion reliably in the +background, survive restarts, and run safely across multiple server +replicas — while keeping the synchronous path available as a rollback. + +*Not in scope:* `RelationalGarbageCollector`, which deletes tombstoned +**rows** from Gravitino's relational backend. Different IO surface, +different failure model — kept separate. + +--- + +## 2. Goals + +1. **Fast response**: `DELETE … ?purgeRequested=true` returns at typical + request latency (target p99 < 500 ms, and < 5 s even for the largest + tables) regardless of table size. +2. **Operational simplicity**: Ship a single async deletion path with the + smallest possible bug surface; retain the synchronous path behind a + feature flag purely for rollback, not as a parallel product surface. +3. **Reliable deletion**: The async path deletes every file the + synchronous purge would have deleted, retries transient failures, and + survives restarts and replica failover. +4. **Wire compatibility**: No change to the Iceberg REST wire protocol. +5. **Request-thread authorization**: Authorization runs on the request + thread, never deferred. +6. **Uniform object coverage**: Tables, views, and namespace (schema) + drops all flow through the same async cleanup mechanism (PRD §2.2). + +--- + +## 3. Non-Goals + +1. **Native soft-delete semantics (R2)**: This design only delivers async + *hard* deletion (R1). Full soft-delete / undrop semantics are a + follow-up requirement; §5.7 records the seam they build on so R2 stays + a small extension rather than a separate V2 mechanism. +2. **Purge cancellation (v1)**: User-initiated cancellation of in-flight + purges is out of scope for v1; it needs a control API and lifecycle + states we can add later without breaking the design. +3. **Third-party deletion plugins**: We ship one async implementation. A + pluggable extension point is explicitly *not* built now — see §4 for + why, and the conditions under which we would revisit it. + +--- + +## 4. Solution Investigations + +The earlier draft proposed a pluggable `IcebergPurger` SPI. Review +feedback (PR #11152) pushed back: the PRD does not ask for an SPI, no +second implementation is in flight, and the competitive frame against +Polaris is "simpler design with smaller bug surface." We re-evaluated and +chose a single async implementation with a synchronous rollback flag. + +| Approach | Pros | Cons | Decision | +|----------|------|------|----------| +| Synchronous only (status quo) | Simplest; strongest "deleted means gone" guarantee | Exceeds HTTP timeouts, saturates Jetty, no retry/audit on large tables | **Rejected** — the problem we are solving | +| Pluggable `IcebergPurger` SPI (factory + classpath loading + context) | Extensible to object-store batch / audit-only without code changes | Real added surface (SPI, discovery factory, context) with **no** second implementation in flight; widens the bug surface against the PRD's "smaller surface" goal | **Rejected** — revisit only when a second implementation has a real customer behind it | +| Reuse `RelationalGarbageCollector` | Proven worker/scheduling pattern already in the codebase | Different IO surface (object store vs. JDBC) and failure model (best-effort per-file vs. transactional rows) | **Rejected** — share patterns, not code | +| External job system only (Quartz / Temporal) | Mature scheduling, retries, observability | Heavy operational burden imposed on every operator | **Rejected** — disproportionate for one deletion workload | +| Enumerate files at enqueue time | Worker needs no metadata re-read | Enumeration is slow on large tables (defeats the latency goal) and bloats job rows | **Rejected** — store `metadata_location`, re-read at run time | +| **Single async purger (JDBC job table + worker pool) with a synchronous fallback flag** | Smallest bug surface; reliable, restart-safe, cluster-safe via `SKIP LOCKED`; one code path to test | No built-in extension point — a second strategy would need a follow-up refactor | **Chosen** | + +--- + +## 5. Proposal + +### 5.1 Overview + +``` + DELETE …?purgeRequested=true + │ + ▼ + IcebergTableOperationExecutor.dropTable + │ async-purge.enabled ? + ┌─────┴───────────────┐ + │ true (default 1.3) │ false (rollback) + ▼ ▼ + persist iceberg_purge_job CatalogUtil.dropTableData + row, return 204 (synchronous, on request thread) + │ + ▼ + worker pool (any replica) + leases job via FOR UPDATE SKIP LOCKED + → rebuild snapshot graph from metadata_location + → delete every reachable file, retry w/ backoff + → SUCCEEDED | DEAD_LETTER +``` + +There is one async deletion path. The synchronous path is retained only +as a feature-flag rollback (`async-purge.enabled = false`). + +### 5.2 Request-path interaction + +```java +public void dropTable(IcebergRequestContext ctx, TableIdentifier id, + boolean purgeRequested) { + IcebergCatalogWrapper w = catalogWrapperManager.getCatalogWrapper(ctx.catalogName()); + if (!purgeRequested) { w.dropTable(id); return; } + + if (!asyncPurgeEnabled) { // rollback path + w.purgeTable(id); // synchronous, today's behavior + return; + } + + TableMetadata metadata = w.loadTableMetadata(id); + w.dropTable(id); // metadata-only drop in the catalog + purgeJobStore.enqueue( + IcebergPurgeJob.builder() + .catalogName(ctx.catalogName()) + .tableIdentifier(id) + .metadataLocation(metadata.metadataFileLocation()) + .fileIoImpl(w.fileIoImpl()) + .fileIoProperties(w.fileIoProperties()) + .createdBy(ctx.userPrincipal()) + .build()); +} +``` + +Order matters on the async path: load metadata location → drop catalog +entry → enqueue the job. A purge job exists only for a table that is +already gone from the catalog. `fileIoProperties` is captured at enqueue +time so the worker can reconstruct `FileIO` even if the catalog is later +reconfigured. + +### 5.3 Schema — `iceberg_purge_job` + +```sql +CREATE TABLE IF NOT EXISTS `iceberg_purge_job` ( + `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, + `metalake_name` VARCHAR(128) NOT NULL, + `catalog_name` VARCHAR(128) NOT NULL, + `namespace` VARCHAR(512) NOT NULL, + `object_name` VARCHAR(256) NOT NULL, + `object_type` VARCHAR(16) NOT NULL COMMENT 'TABLE|VIEW', + `metadata_location` VARCHAR(1024) NOT NULL, + `file_io_impl` VARCHAR(256) NOT NULL, + `file_io_props` MEDIUMTEXT NOT NULL COMMENT 'JSON', + `state` VARCHAR(16) NOT NULL COMMENT 'PENDING|RUNNING|SUCCEEDED|DEAD_LETTER', + `attempts` INT(10) NOT NULL DEFAULT 0, + `max_attempts` INT(10) NOT NULL, + `last_error` TEXT NULL, + `lease_owner` VARCHAR(128) NULL, + `lease_expires_at` BIGINT(20) NULL, + `next_attempt_at` BIGINT(20) NOT NULL, + `created_at` BIGINT(20) NOT NULL, + `created_by` VARCHAR(128) NOT NULL, + `updated_at` BIGINT(20) NOT NULL, + PRIMARY KEY (`id`), + KEY `idx_state_next_attempt` (`state`, `next_attempt_at`) +) ENGINE=InnoDB; +``` + +We store only `metadata_location`, not the file list — enumeration is +slow on large tables, and `TableMetadataParser.read(io, location)` +rebuilds the snapshot graph deterministically when the worker runs. + +Migration: `upgrade-1.2.0-to-1.3.0-mysql.sql` (and H2 / PostgreSQL). + +### 5.4 Worker pool + +A `ScheduledThreadPoolExecutor` modeled on `RelationalGarbageCollector`. +Each tick: + +```sql +SELECT * FROM iceberg_purge_job + WHERE state IN ('PENDING','RUNNING') + AND next_attempt_at <= :now + AND (lease_expires_at IS NULL OR lease_expires_at < :now) + ORDER BY next_attempt_at LIMIT :batch + FOR UPDATE SKIP LOCKED; +``` + +then updates the row to `RUNNING` with `lease_owner=:me`, +`lease_expires_at=:now+leaseTimeout`. `SKIP LOCKED` is the cluster-safety +primitive: any number of replicas can run the worker without external +coordination. H2 falls back to a conditional update. + +Execution mirrors `CatalogHandlers.purgeTable`: + +```java +TableMetadata meta = TableMetadataParser.read(io, job.metadataLocation()); +Tasks.foreach(collectAllReachableFiles(meta)) + .executeWith(deleteExecutor) + .retry(perFileRetries) + .suppressFailureWhenFinished() + .run(io::deleteFile); +``` + +A separate task renews the lease every `leaseTimeout / 3`. If the host +dies, the lease expires and another replica reclaims the job. Review Comment: How to handle server restart problem? Also, how to handle a multiple-server deployment situation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
