This is an automated email from the ASF dual-hosted git repository.
dhuo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 8abf19a06 Introduce idempotency records schema and Postgres-backed
IdempotencyStore (#3205)
8abf19a06 is described below
commit 8abf19a06f809c8d1d0764b44e613c2614ce2c67
Author: Huaxin Gao <[email protected]>
AuthorDate: Tue Jan 6 20:40:52 2026 -0800
Introduce idempotency records schema and Postgres-backed IdempotencyStore
(#3205)
This PR adds the persistence foundation for REST idempotency in Polaris:
Defines an idempotency_records table (Postgres) with key, binding,
liveness, and finalization fields.
Introduces a storage‑agnostic IdempotencyRecord model and IdempotencyStore
SPI in polaris-core.
Implements PostgresIdempotencyStore in polaris-relational-jdbc
---
persistence/relational-jdbc/build.gradle.kts | 5 +
.../jdbc/idempotency/PostgresIdempotencyStore.java | 255 +++++++++++++++++++++
.../src/main/resources/postgres/schema-v3.sql | 28 +++
.../idempotency/PostgresIdempotencyStoreIT.java | 199 ++++++++++++++++
.../polaris/core/persistence/IdempotencyStore.java | 190 +++++++++++++++
.../polaris/idempotency/IdempotencyRecord.java | 171 ++++++++++++++
6 files changed, 848 insertions(+)
diff --git a/persistence/relational-jdbc/build.gradle.kts
b/persistence/relational-jdbc/build.gradle.kts
index 39fe5614f..f5c5597a4 100644
--- a/persistence/relational-jdbc/build.gradle.kts
+++ b/persistence/relational-jdbc/build.gradle.kts
@@ -42,4 +42,9 @@ dependencies {
testImplementation(libs.mockito.junit.jupiter)
testImplementation(libs.h2)
testImplementation(testFixtures(project(":polaris-core")))
+
+ testImplementation(platform(libs.testcontainers.bom))
+
+ testImplementation("org.testcontainers:junit-jupiter:1.20.3")
+ testImplementation("org.testcontainers:postgresql:1.20.3")
}
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
new file mode 100644
index 000000000..ff826e19e
--- /dev/null
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.idempotency;
+
+import jakarta.annotation.Nonnull;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.sql.DataSource;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
+import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
+import
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import org.apache.polaris.persistence.relational.jdbc.models.Converter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Postgres implementation of IdempotencyStore. */
+public final class PostgresIdempotencyStore implements IdempotencyStore {
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresIdempotencyStore.class);
+
+ private static final String TABLE = "POLARIS_SCHEMA.idempotency_records";
+
+ private final DatasourceOperations ops;
+
+ public PostgresIdempotencyStore(
+ @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg)
+ throws SQLException {
+ this.ops = new DatasourceOperations(dataSource, cfg);
+ }
+
+ @Override
+ public ReserveResult reserve(
+ String realmId,
+ String idempotencyKey,
+ String operationType,
+ String normalizedResourceId,
+ Instant expiresAt,
+ String executorId,
+ Instant now) {
+ String sql =
+ "INSERT INTO "
+ + TABLE
+ + " (realm_id, idempotency_key, operation_type, resource_id,"
+ + " http_status, error_subtype, response_summary,
response_headers, finalized_at,"
+ + " created_at, updated_at, heartbeat_at, executor_id, expires_at)"
+ + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?,
?)"
+ + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING";
+ List<Object> params =
+ List.of(
+ realmId,
+ idempotencyKey,
+ operationType,
+ normalizedResourceId,
+ Timestamp.from(now),
+ Timestamp.from(now),
+ Timestamp.from(now),
+ executorId,
+ Timestamp.from(expiresAt));
+ try {
+ int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql,
params));
+ if (updated == 1) {
+ return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
+ } else {
+ // Load existing to return to caller
+ return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId,
idempotencyKey));
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to reserve idempotency key", e);
+ }
+ }
+
+ @Override
+ public Optional<IdempotencyRecord> load(String realmId, String
idempotencyKey) {
+ String sql =
+ "SELECT realm_id, idempotency_key, operation_type, resource_id,
http_status, error_subtype,"
+ + " response_summary, response_headers, created_at, updated_at,
finalized_at, heartbeat_at,"
+ + " executor_id, expires_at"
+ + " FROM "
+ + TABLE
+ + " WHERE realm_id = ? AND idempotency_key = ?";
+ try {
+ final IdempotencyRecord[] holder = new IdempotencyRecord[1];
+ ops.executeSelectOverStream(
+ new QueryGenerator.PreparedQuery(sql, List.of(realmId,
idempotencyKey)),
+ new Converter<IdempotencyRecord>() {
+ @Override
+ public IdempotencyRecord fromResultSet(ResultSet rs) throws
SQLException {
+ return convert(rs);
+ }
+
+ @Override
+ public Map<String, Object> toMap(DatabaseType databaseType) {
+ return Map.of();
+ }
+ },
+ stream -> stream.findFirst().ifPresent(r -> holder[0] = r));
+ return Optional.ofNullable(holder[0]);
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to load idempotency record", e);
+ }
+ }
+
+ @Override
+ public HeartbeatResult updateHeartbeat(
+ String realmId, String idempotencyKey, String executorId, Instant now) {
+ String sql =
+ "UPDATE "
+ + TABLE
+ + " SET heartbeat_at = ?, updated_at = ?"
+ + " WHERE realm_id = ? AND idempotency_key = ?"
+ + " AND http_status IS NULL"
+ + " AND (executor_id IS NULL OR executor_id = ?)";
+ try {
+ int rows =
+ ops.executeUpdate(
+ new QueryGenerator.PreparedQuery(
+ sql,
+ List.of(
+ Timestamp.from(now),
+ Timestamp.from(now),
+ realmId,
+ idempotencyKey,
+ executorId)));
+ if (rows > 0) {
+ return HeartbeatResult.UPDATED;
+ }
+
+ // No rows updated: determine why by loading the current record, if any.
+ // TODO: consider using a single atomic read/write (for example,
PostgreSQL
+ // UPDATE ... RETURNING) to avoid this follow-up lookup and make the
+ // conflicting state observable in the same operation.
+ Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
+ if (existing.isEmpty()) {
+ return HeartbeatResult.NOT_FOUND;
+ }
+
+ IdempotencyRecord record = existing.get();
+ if (record.getHttpStatus() != null) {
+ return HeartbeatResult.FINALIZED;
+ }
+
+ // Record is still IN_PROGRESS but owned by a different executor.
+ return HeartbeatResult.LOST_OWNERSHIP;
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to update heartbeat", e);
+ }
+ }
+
+ @Override
+ public boolean finalizeRecord(
+ String realmId,
+ String idempotencyKey,
+ Integer httpStatus,
+ String errorSubtype,
+ String responseSummary,
+ String responseHeaders,
+ Instant finalizedAt) {
+ String sql =
+ "UPDATE "
+ + TABLE
+ + " SET http_status = ?, error_subtype = ?, response_summary = ?,
response_headers = ?,"
+ + " finalized_at = ?, updated_at = ?"
+ + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS
NULL";
+ try {
+ int rows =
+ ops.executeUpdate(
+ new QueryGenerator.PreparedQuery(
+ sql,
+ Arrays.asList(
+ httpStatus,
+ errorSubtype,
+ responseSummary,
+ responseHeaders,
+ Timestamp.from(finalizedAt),
+ Timestamp.from(finalizedAt),
+ realmId,
+ idempotencyKey)));
+ return rows > 0;
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to finalize idempotency record", e);
+ }
+ }
+
+ @Override
+ public int purgeExpired(String realmId, Instant before) {
+ String sql = "DELETE FROM " + TABLE + " WHERE realm_id = ? AND expires_at
< ?";
+ try {
+ return ops.executeUpdate(
+ new QueryGenerator.PreparedQuery(sql, List.of(realmId,
Timestamp.from(before))));
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to purge expired idempotency
records", e);
+ }
+ }
+
+ private static IdempotencyRecord convert(ResultSet rs) {
+ try {
+ String realmId = rs.getString("realm_id");
+ String idempotencyKey = rs.getString("idempotency_key");
+ String operationType = rs.getString("operation_type");
+ String resourceId = rs.getString("resource_id");
+ Integer httpStatus = (Integer) rs.getObject("http_status");
+ String errorSubtype = rs.getString("error_subtype");
+ String responseSummary = rs.getString("response_summary");
+ String responseHeaders = rs.getString("response_headers");
+ Instant createdAt = rs.getTimestamp("created_at").toInstant();
+ Instant updatedAt = rs.getTimestamp("updated_at").toInstant();
+ Timestamp fts = rs.getTimestamp("finalized_at");
+ Instant finalizedAt = fts == null ? null : fts.toInstant();
+ Timestamp hb = rs.getTimestamp("heartbeat_at");
+ Instant heartbeatAt = hb == null ? null : hb.toInstant();
+ String executorId = rs.getString("executor_id");
+ Instant expiresAt = rs.getTimestamp("expires_at").toInstant();
+ return new IdempotencyRecord(
+ realmId,
+ idempotencyKey,
+ operationType,
+ resourceId,
+ httpStatus,
+ errorSubtype,
+ responseSummary,
+ responseHeaders,
+ createdAt,
+ updatedAt,
+ finalizedAt,
+ heartbeatAt,
+ executorId,
+ expiresAt);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql
b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql
index 96897f510..ed99b6120 100644
--- a/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql
+++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql
@@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events (
additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB,
PRIMARY KEY (event_id)
);
+
+-- Idempotency records (key-only idempotency; durable replay)
+CREATE TABLE IF NOT EXISTS idempotency_records (
+ realm_id TEXT NOT NULL,
+ idempotency_key TEXT NOT NULL,
+ operation_type TEXT NOT NULL,
+ resource_id TEXT NOT NULL,
+
+ -- Finalization/replay
+ http_status INTEGER, -- NULL while IN_PROGRESS; set only
on finalized 2xx/terminal 4xx
+ error_subtype TEXT, -- optional: e.g., already_exists,
namespace_not_empty, idempotency_replay_failed
+ response_summary TEXT, -- minimal body to reproduce
equivalent response (JSON string)
+ response_headers TEXT, -- small whitelisted headers to
replay (JSON string)
+ finalized_at TIMESTAMP, -- when http_status was written
+
+ -- Liveness/ops
+ created_at TIMESTAMP NOT NULL,
+ updated_at TIMESTAMP NOT NULL,
+ heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS
+ executor_id TEXT, -- owner pod/worker id
+ expires_at TIMESTAMP,
+
+ PRIMARY KEY (realm_id, idempotency_key)
+);
+
+-- Helpful indexes
+CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires
+ ON idempotency_records (realm_id, expires_at);
diff --git
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
new file mode 100644
index 000000000..918c9f72d
--- /dev/null
+++
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.idempotency;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.InputStream;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import javax.sql.DataSource;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
+import
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+public class PostgresIdempotencyStoreIT {
+
+ @Container
+ private static final PostgreSQLContainer<?> POSTGRES =
+ new PostgreSQLContainer<>("postgres:17.5-alpine");
+
+ private static DataSource dataSource;
+ private static PostgresIdempotencyStore store;
+
+ @BeforeAll
+ static void setup() throws Exception {
+ POSTGRES.start();
+ PGSimpleDataSource ds = new PGSimpleDataSource();
+ ds.setURL(POSTGRES.getJdbcUrl());
+ ds.setUser(POSTGRES.getUsername());
+ ds.setPassword(POSTGRES.getPassword());
+ dataSource = ds;
+
+ // Apply schema
+ RelationalJdbcConfiguration cfg =
+ new RelationalJdbcConfiguration() {
+ @Override
+ public Optional<Integer> maxRetries() {
+ return Optional.of(3);
+ }
+
+ @Override
+ public Optional<Long> maxDurationInMs() {
+ return Optional.of(5000L);
+ }
+
+ @Override
+ public Optional<Long> initialDelayInMs() {
+ return Optional.of(100L);
+ }
+ };
+ DatasourceOperations ops = new DatasourceOperations(dataSource, cfg);
+ try (InputStream is =
+ Thread.currentThread()
+ .getContextClassLoader()
+ .getResourceAsStream("postgres/schema-v3.sql")) {
+ if (is == null) {
+ throw new IllegalStateException("schema-v3.sql not found on
classpath");
+ }
+ ops.executeScript(is);
+ }
+
+ store = new PostgresIdempotencyStore(dataSource, cfg);
+ }
+
+ @AfterAll
+ static void teardown() {
+ POSTGRES.stop();
+ }
+
+ @Test
+ void reserveSingleWinnerAndDuplicate() {
+ String realm = "test-realm";
+ String key = "K1";
+ String op = "commit-table";
+ String rid = "tables/ns.tbl";
+ Instant now = Instant.now();
+ Instant exp = now.plus(Duration.ofMinutes(5));
+
+ IdempotencyStore.ReserveResult r1 = store.reserve(realm, key, op, rid,
exp, "A", now);
+
assertThat(r1.getType()).isEqualTo(IdempotencyStore.ReserveResultType.OWNED);
+
+ IdempotencyStore.ReserveResult r2 = store.reserve(realm, key, op, rid,
exp, "B", now);
+
assertThat(r2.getType()).isEqualTo(IdempotencyStore.ReserveResultType.DUPLICATE);
+ assertThat(r2.getExisting()).isPresent();
+ IdempotencyRecord rec = r2.getExisting().get();
+ assertThat(rec.getRealmId()).isEqualTo(realm);
+ assertThat(rec.getIdempotencyKey()).isEqualTo(key);
+ assertThat(rec.getOperationType()).isEqualTo(op);
+ assertThat(rec.getNormalizedResourceId()).isEqualTo(rid);
+ assertThat(rec.getHttpStatus()).isNull();
+ }
+
+ @Test
+ void heartbeatAndFinalize() {
+ String realm = "test-realm";
+ String key = "K2";
+ String op = "commit-table";
+ String rid = "tables/ns.tbl2";
+ Instant now = Instant.now();
+ Instant exp = now.plus(Duration.ofMinutes(5));
+
+ store.reserve(realm, key, op, rid, exp, "A", now);
+ HeartbeatResult hb = store.updateHeartbeat(realm, key, "A",
now.plusSeconds(1));
+ assertThat(hb).isEqualTo(HeartbeatResult.UPDATED);
+
+ boolean fin =
+ store.finalizeRecord(
+ realm,
+ key,
+ 201,
+ null,
+ "{\"ok\":true}",
+ "{\"Content-Type\":\"application/json\"}",
+ now.plusSeconds(2));
+ assertThat(fin).isTrue();
+
+ // finalize again should be a no-op
+ boolean fin2 =
+ store.finalizeRecord(
+ realm,
+ key,
+ 201,
+ null,
+ "{\"ok\":true}",
+ "{\"Content-Type\":\"application/json\"}",
+ now.plusSeconds(3));
+ assertThat(fin2).isFalse();
+
+ Optional<IdempotencyRecord> rec = store.load(realm, key);
+ assertThat(rec).isPresent();
+ assertThat(rec.get().isFinalized()).isTrue();
+ assertThat(rec.get().getHttpStatus()).isEqualTo(201);
+ }
+
+ @Test
+ void purgeExpired() {
+ String realm = "test-realm";
+ String key = "K3";
+ String op = "drop-table";
+ String rid = "tables/ns.tbl3";
+ Instant now = Instant.now();
+ Instant expPast = now.minus(Duration.ofMinutes(1));
+
+ store.reserve(realm, key, op, rid, expPast, "A", now);
+ int purged = store.purgeExpired(realm, Instant.now());
+ assertThat(purged).isEqualTo(1);
+ }
+
+ @Test
+ void duplicateReturnsExistingBindingForMismatch() {
+ String realm = "test-realm";
+ String key = "K4";
+ String op1 = "commit-table";
+ String rid1 = "tables/ns.tbl4";
+ String op2 = "drop-table"; // different binding
+ String rid2 = "tables/ns.tbl4"; // same resource, different op
+ Instant now = Instant.now();
+ Instant exp = now.plus(Duration.ofMinutes(5));
+
+ IdempotencyStore.ReserveResult r1 = store.reserve(realm, key, op1, rid1,
exp, "A", now);
+
assertThat(r1.getType()).isEqualTo(IdempotencyStore.ReserveResultType.OWNED);
+
+ // Second reserve with different op/resource should *not* overwrite the
original binding.
+ // The store must return DUPLICATE with the *original* (op1, rid1); the
HTTP layer
+ // (IdempotencyFilter)
+ // will detect the mismatch and return 422.
+ IdempotencyStore.ReserveResult r2 = store.reserve(realm, key, op2, rid2,
exp, "B", now);
+
assertThat(r2.getType()).isEqualTo(IdempotencyStore.ReserveResultType.DUPLICATE);
+ assertThat(r2.getExisting()).isPresent();
+ IdempotencyRecord rec = r2.getExisting().get();
+ assertThat(rec.getOperationType()).isEqualTo(op1);
+ assertThat(rec.getNormalizedResourceId()).isEqualTo(rid1);
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
new file mode 100644
index 000000000..cf3e9d98d
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+
+/**
+ * Abstraction for persisting and querying idempotency records.
+ *
+ * <p>An {@link IdempotencyStore} is responsible for:
+ *
+ * <ul>
+ * <li>Reserving an idempotency key for a particular operation and resource
+ * <li>Recording completion status and response metadata
+ * <li>Allowing callers to look up existing records to detect duplicates
+ * <li>Expiring and purging old reservations
+ * </ul>
+ *
+ * <p>Implementations must be thread-safe if used concurrently.
+ */
+public interface IdempotencyStore {
+
+ /** High-level outcome of attempting to reserve an idempotency key. */
+ enum ReserveResultType {
+ /** The caller successfully acquired ownership of the idempotency key. */
+ OWNED,
+ /** A reservation already exists for the key; the caller does not own it.
*/
+ DUPLICATE
+ }
+
+ /**
+ * Result of attempting to update the heartbeat for an in-progress
idempotency record.
+ *
+ * <p>This allows callers to distinguish between different "no update"
scenarios instead of
+ * overloading them into a boolean.
+ */
+ enum HeartbeatResult {
+ /** The heartbeat was successfully updated for the in-progress record
owned by this executor. */
+ UPDATED,
+
+ /**
+ * The idempotency record exists but has already been finalized with an
HTTP status; there is no
+ * longer an in-progress reservation to heartbeat.
+ */
+ FINALIZED,
+
+ /**
+ * No idempotency record exists for the specified realm and key. This can
happen if the record
+ * was never created or has already been purged.
+ */
+ NOT_FOUND,
+
+ /**
+ * An in-progress idempotency record exists for the key, but it is owned
by a different
+ * executor. The caller should stop heartbeating as it no longer owns the
reservation.
+ */
+ LOST_OWNERSHIP
+ }
+
+ /**
+ * Result of a {@link #reserve(String, String, String, String, Instant,
String, Instant)} call,
+ * including the outcome and, when applicable, the existing idempotency
record.
+ */
+ final class ReserveResult {
+ private final ReserveResultType type;
+ private final Optional<IdempotencyRecord> existing;
+
+ public ReserveResult(ReserveResultType type, Optional<IdempotencyRecord>
existing) {
+ this.type = type;
+ this.existing = existing == null ? Optional.empty() : existing;
+ }
+
+ /**
+ * Returns the outcome of the reservation attempt.
+ *
+ * @return the {@link ReserveResultType}
+ */
+ public ReserveResultType getType() {
+ return type;
+ }
+
+ /**
+ * Returns the existing idempotency record when {@link #getType()} is
{@link
+ * ReserveResultType#DUPLICATE}, otherwise {@link Optional#empty()}.
+ *
+ * @return the existing {@link IdempotencyRecord}, if present
+ */
+ public Optional<IdempotencyRecord> getExisting() {
+ return existing;
+ }
+ }
+
+ /**
+ * Attempts to reserve an idempotency key for a given operation and resource.
+ *
+ * <p>If no record exists yet, the implementation should create a new
reservation owned by {@code
+ * executorId}. If a record already exists, the implementation should return
{@link
+ * ReserveResultType#DUPLICATE} along with the existing record.
+ *
+ * @param realmId logical tenant or realm identifier
+ * @param idempotencyKey application-provided idempotency key
+ * @param operationType logical operation name (e.g., {@code "commit-table"})
+ * @param normalizedResourceId normalized identifier of the affected resource
+ * @param expiresAt timestamp after which the reservation is considered
expired
+ * @param executorId identifier of the caller attempting the reservation
+ * @param now timestamp representing the current time
+ * @return {@link ReserveResult} describing whether the caller owns the
reservation or hit a
+ * duplicate
+ */
+ ReserveResult reserve(
+ String realmId,
+ String idempotencyKey,
+ String operationType,
+ String normalizedResourceId,
+ Instant expiresAt,
+ String executorId,
+ Instant now);
+
+ /**
+ * Loads an existing idempotency record for the given realm and key, if
present.
+ *
+ * @param realmId logical tenant or realm identifier
+ * @param idempotencyKey application-provided idempotency key
+ * @return the corresponding {@link IdempotencyRecord}, if it exists
+ */
+ Optional<IdempotencyRecord> load(String realmId, String idempotencyKey);
+
+ /**
+ * Updates the heartbeat for an in-progress reservation to indicate that the
executor is still
+ * actively processing.
+ *
+ * @param realmId logical tenant or realm identifier
+ * @param idempotencyKey application-provided idempotency key
+ * @param executorId identifier of the executor that owns the reservation
+ * @param now timestamp representing the current time
+ * @return {@link HeartbeatResult} describing whether the heartbeat was
updated or why it was not
+ */
+ HeartbeatResult updateHeartbeat(
+ String realmId, String idempotencyKey, String executorId, Instant now);
+
+ /**
+ * Marks an idempotency record as finalized, recording HTTP status and
response metadata.
+ *
+ * <p>Implementations should be tolerant of idempotent re-finalization
attempts and typically
+ * return {@code false} when a record was already finalized.
+ *
+ * @param realmId logical tenant or realm identifier
+ * @param idempotencyKey application-provided idempotency key
+ * @param httpStatus HTTP status code returned to the client, or {@code
null} if not applicable
+ * @param errorSubtype optional error subtype or code, if the operation
failed
+ * @param responseSummary short, serialized representation of the response
body
+ * @param responseHeaders serialized representation of response headers
+ * @param finalizedAt timestamp when the operation completed
+ * @return {@code true} if the record was transitioned to a finalized state,
{@code false}
+ * otherwise
+ */
+ boolean finalizeRecord(
+ String realmId,
+ String idempotencyKey,
+ Integer httpStatus,
+ String errorSubtype,
+ String responseSummary,
+ String responseHeaders,
+ Instant finalizedAt);
+
+ /**
+ * Purges records in a given realm whose expiration time is strictly before
the given instant.
+ *
+ * @param realmId logical tenant or realm identifier
+ * @param before cutoff instant; records expiring before this time may be
removed
+ * @return number of records that were purged
+ */
+ int purgeExpired(String realmId, Instant before);
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
b/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
new file mode 100644
index 000000000..e4cae7ef8
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.idempotency;
+
+import java.time.Instant;
+
+public final class IdempotencyRecord {
+ private final String realmId;
+ private final String idempotencyKey;
+ private final String operationType;
+ private final String normalizedResourceId;
+
+ private final Integer httpStatus;
+ private final String errorSubtype;
+ private final String responseSummary;
+ private final String responseHeaders;
+ private final Instant finalizedAt;
+
+ private final Instant createdAt;
+ private final Instant updatedAt;
+ private final Instant heartbeatAt;
+ private final String executorId;
+ private final Instant expiresAt;
+
+ public IdempotencyRecord(
+ String realmId,
+ String idempotencyKey,
+ String operationType,
+ String normalizedResourceId,
+ Integer httpStatus,
+ String errorSubtype,
+ String responseSummary,
+ String responseHeaders,
+ Instant createdAt,
+ Instant updatedAt,
+ Instant finalizedAt,
+ Instant heartbeatAt,
+ String executorId,
+ Instant expiresAt) {
+ this.realmId = realmId;
+ this.idempotencyKey = idempotencyKey;
+ this.operationType = operationType;
+ this.normalizedResourceId = normalizedResourceId;
+ this.httpStatus = httpStatus;
+ this.errorSubtype = errorSubtype;
+ this.responseSummary = responseSummary;
+ this.responseHeaders = responseHeaders;
+ this.createdAt = createdAt;
+ this.updatedAt = updatedAt;
+ this.finalizedAt = finalizedAt;
+ this.heartbeatAt = heartbeatAt;
+ this.executorId = executorId;
+ this.expiresAt = expiresAt;
+ }
+
+ public String getRealmId() {
+ return realmId;
+ }
+
+ public String getIdempotencyKey() {
+ return idempotencyKey;
+ }
+
+ public String getOperationType() {
+ return operationType;
+ }
+
+ public String getNormalizedResourceId() {
+ return normalizedResourceId;
+ }
+
+ /**
+ * HTTP status code returned to the client for this idempotent operation.
+ *
+ * <p>Remains {@code null} while the record is {@code IN_PROGRESS} and is
set only when the
+ * operation reaches a terminal 2xx or 4xx state.
+ */
+ public Integer getHttpStatus() {
+ return httpStatus;
+ }
+
+ /**
+ * Optional error subtype or code that provides additional detail when the
operation failed.
+ *
+ * <p>Examples include {@code already_exists}, {@code namespace_not_empty},
or {@code
+ * idempotency_replay_failed}.
+ */
+ public String getErrorSubtype() {
+ return errorSubtype;
+ }
+
+ /**
+ * Minimal serialized representation of the response body used to reproduce
an equivalent
+ * response.
+ *
+ * <p>This is typically a compact JSON string that contains just enough
information for the HTTP
+ * layer to reconstruct the response for duplicate idempotent requests.
+ */
+ public String getResponseSummary() {
+ return responseSummary;
+ }
+
+ /**
+ * Serialized representation of a small, whitelisted set of HTTP response
headers.
+ *
+ * <p>Stored as a JSON string so that the HTTP layer can replay key headers
(such as {@code
+ * Content-Type}) when serving a duplicate idempotent request.
+ */
+ public String getResponseHeaders() {
+ return responseHeaders;
+ }
+
+ public Instant getCreatedAt() {
+ return createdAt;
+ }
+
+ public Instant getUpdatedAt() {
+ return updatedAt;
+ }
+
+ /**
+ * Timestamp indicating when the record was finalized.
+ *
+ * <p>Set at the same time as {@link #getHttpStatus()} when the operation
completes; {@code null}
+ * while the record is still {@code IN_PROGRESS}.
+ */
+ public Instant getFinalizedAt() {
+ return finalizedAt;
+ }
+
+ /**
+ * Timestamp of the most recent successful heartbeat while the operation is
{@code IN_PROGRESS}.
+ *
+ * <p>This is updated by the owning executor to signal liveness and is used
by reconciliation
+ * logic to detect stuck or abandoned in-progress records.
+ */
+ public Instant getHeartbeatAt() {
+ return heartbeatAt;
+ }
+
+ /**
+ * Identifier of the executor (for example pod or worker id) that currently
owns the in-progress
+ * reservation.
+ */
+ public String getExecutorId() {
+ return executorId;
+ }
+
+ /** Timestamp after which the reservation is considered expired and eligible
for purging. */
+ public Instant getExpiresAt() {
+ return expiresAt;
+ }
+
+ public boolean isFinalized() {
+ return httpStatus != null;
+ }
+}