This is an automated email from the ASF dual-hosted git repository.

dhuo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 8abf19a06 Introduce idempotency records schema and Postgres-backed 
IdempotencyStore (#3205)
8abf19a06 is described below

commit 8abf19a06f809c8d1d0764b44e613c2614ce2c67
Author: Huaxin Gao <[email protected]>
AuthorDate: Tue Jan 6 20:40:52 2026 -0800

    Introduce idempotency records schema and Postgres-backed IdempotencyStore 
(#3205)
    
    This PR adds the persistence foundation for REST idempotency in Polaris:
    Defines an idempotency_records table (Postgres) with key, binding, 
liveness, and finalization fields.
    Introduces a storage‑agnostic IdempotencyRecord model and IdempotencyStore 
SPI in polaris-core.
    Implements PostgresIdempotencyStore in polaris-relational-jdbc
---
 persistence/relational-jdbc/build.gradle.kts       |   5 +
 .../jdbc/idempotency/PostgresIdempotencyStore.java | 255 +++++++++++++++++++++
 .../src/main/resources/postgres/schema-v3.sql      |  28 +++
 .../idempotency/PostgresIdempotencyStoreIT.java    | 199 ++++++++++++++++
 .../polaris/core/persistence/IdempotencyStore.java | 190 +++++++++++++++
 .../polaris/idempotency/IdempotencyRecord.java     | 171 ++++++++++++++
 6 files changed, 848 insertions(+)

diff --git a/persistence/relational-jdbc/build.gradle.kts 
b/persistence/relational-jdbc/build.gradle.kts
index 39fe5614f..f5c5597a4 100644
--- a/persistence/relational-jdbc/build.gradle.kts
+++ b/persistence/relational-jdbc/build.gradle.kts
@@ -42,4 +42,9 @@ dependencies {
   testImplementation(libs.mockito.junit.jupiter)
   testImplementation(libs.h2)
   testImplementation(testFixtures(project(":polaris-core")))
+
+  testImplementation(platform(libs.testcontainers.bom))
+
+  testImplementation("org.testcontainers:junit-jupiter:1.20.3")
+  testImplementation("org.testcontainers:postgresql:1.20.3")
 }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
new file mode 100644
index 000000000..ff826e19e
--- /dev/null
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.idempotency;
+
+import jakarta.annotation.Nonnull;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.sql.DataSource;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
+import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
+import 
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import org.apache.polaris.persistence.relational.jdbc.models.Converter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Postgres implementation of IdempotencyStore. */
+public final class PostgresIdempotencyStore implements IdempotencyStore {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PostgresIdempotencyStore.class);
+
+  private static final String TABLE = "POLARIS_SCHEMA.idempotency_records";
+
+  private final DatasourceOperations ops;
+
+  public PostgresIdempotencyStore(
+      @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg)
+      throws SQLException {
+    this.ops = new DatasourceOperations(dataSource, cfg);
+  }
+
+  @Override
+  public ReserveResult reserve(
+      String realmId,
+      String idempotencyKey,
+      String operationType,
+      String normalizedResourceId,
+      Instant expiresAt,
+      String executorId,
+      Instant now) {
+    String sql =
+        "INSERT INTO "
+            + TABLE
+            + " (realm_id, idempotency_key, operation_type, resource_id,"
+            + " http_status, error_subtype, response_summary, 
response_headers, finalized_at,"
+            + " created_at, updated_at, heartbeat_at, executor_id, expires_at)"
+            + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, 
?)"
+            + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING";
+    List<Object> params =
+        List.of(
+            realmId,
+            idempotencyKey,
+            operationType,
+            normalizedResourceId,
+            Timestamp.from(now),
+            Timestamp.from(now),
+            Timestamp.from(now),
+            executorId,
+            Timestamp.from(expiresAt));
+    try {
+      int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql, 
params));
+      if (updated == 1) {
+        return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
+      } else {
+        // Load existing to return to caller
+        return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, 
idempotencyKey));
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException("Failed to reserve idempotency key", e);
+    }
+  }
+
+  @Override
+  public Optional<IdempotencyRecord> load(String realmId, String 
idempotencyKey) {
+    String sql =
+        "SELECT realm_id, idempotency_key, operation_type, resource_id, 
http_status, error_subtype,"
+            + " response_summary, response_headers, created_at, updated_at, 
finalized_at, heartbeat_at,"
+            + " executor_id, expires_at"
+            + " FROM "
+            + TABLE
+            + " WHERE realm_id = ? AND idempotency_key = ?";
+    try {
+      final IdempotencyRecord[] holder = new IdempotencyRecord[1];
+      ops.executeSelectOverStream(
+          new QueryGenerator.PreparedQuery(sql, List.of(realmId, 
idempotencyKey)),
+          new Converter<IdempotencyRecord>() {
+            @Override
+            public IdempotencyRecord fromResultSet(ResultSet rs) throws 
SQLException {
+              return convert(rs);
+            }
+
+            @Override
+            public Map<String, Object> toMap(DatabaseType databaseType) {
+              return Map.of();
+            }
+          },
+          stream -> stream.findFirst().ifPresent(r -> holder[0] = r));
+      return Optional.ofNullable(holder[0]);
+    } catch (SQLException e) {
+      throw new RuntimeException("Failed to load idempotency record", e);
+    }
+  }
+
+  @Override
+  public HeartbeatResult updateHeartbeat(
+      String realmId, String idempotencyKey, String executorId, Instant now) {
+    String sql =
+        "UPDATE "
+            + TABLE
+            + " SET heartbeat_at = ?, updated_at = ?"
+            + " WHERE realm_id = ? AND idempotency_key = ?"
+            + " AND http_status IS NULL"
+            + " AND (executor_id IS NULL OR executor_id = ?)";
+    try {
+      int rows =
+          ops.executeUpdate(
+              new QueryGenerator.PreparedQuery(
+                  sql,
+                  List.of(
+                      Timestamp.from(now),
+                      Timestamp.from(now),
+                      realmId,
+                      idempotencyKey,
+                      executorId)));
+      if (rows > 0) {
+        return HeartbeatResult.UPDATED;
+      }
+
+      // No rows updated: determine why by loading the current record, if any.
+      // TODO: consider using a single atomic read/write (for example, 
PostgreSQL
+      // UPDATE ... RETURNING) to avoid this follow-up lookup and make the
+      // conflicting state observable in the same operation.
+      Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
+      if (existing.isEmpty()) {
+        return HeartbeatResult.NOT_FOUND;
+      }
+
+      IdempotencyRecord record = existing.get();
+      if (record.getHttpStatus() != null) {
+        return HeartbeatResult.FINALIZED;
+      }
+
+      // Record is still IN_PROGRESS but owned by a different executor.
+      return HeartbeatResult.LOST_OWNERSHIP;
+    } catch (SQLException e) {
+      throw new RuntimeException("Failed to update heartbeat", e);
+    }
+  }
+
+  @Override
+  public boolean finalizeRecord(
+      String realmId,
+      String idempotencyKey,
+      Integer httpStatus,
+      String errorSubtype,
+      String responseSummary,
+      String responseHeaders,
+      Instant finalizedAt) {
+    String sql =
+        "UPDATE "
+            + TABLE
+            + " SET http_status = ?, error_subtype = ?, response_summary = ?, 
response_headers = ?,"
+            + " finalized_at = ?, updated_at = ?"
+            + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS 
NULL";
+    try {
+      int rows =
+          ops.executeUpdate(
+              new QueryGenerator.PreparedQuery(
+                  sql,
+                  Arrays.asList(
+                      httpStatus,
+                      errorSubtype,
+                      responseSummary,
+                      responseHeaders,
+                      Timestamp.from(finalizedAt),
+                      Timestamp.from(finalizedAt),
+                      realmId,
+                      idempotencyKey)));
+      return rows > 0;
+    } catch (SQLException e) {
+      throw new RuntimeException("Failed to finalize idempotency record", e);
+    }
+  }
+
+  @Override
+  public int purgeExpired(String realmId, Instant before) {
+    String sql = "DELETE FROM " + TABLE + " WHERE realm_id = ? AND expires_at 
< ?";
+    try {
+      return ops.executeUpdate(
+          new QueryGenerator.PreparedQuery(sql, List.of(realmId, 
Timestamp.from(before))));
+    } catch (SQLException e) {
+      throw new RuntimeException("Failed to purge expired idempotency 
records", e);
+    }
+  }
+
+  private static IdempotencyRecord convert(ResultSet rs) {
+    try {
+      String realmId = rs.getString("realm_id");
+      String idempotencyKey = rs.getString("idempotency_key");
+      String operationType = rs.getString("operation_type");
+      String resourceId = rs.getString("resource_id");
+      Integer httpStatus = (Integer) rs.getObject("http_status");
+      String errorSubtype = rs.getString("error_subtype");
+      String responseSummary = rs.getString("response_summary");
+      String responseHeaders = rs.getString("response_headers");
+      Instant createdAt = rs.getTimestamp("created_at").toInstant();
+      Instant updatedAt = rs.getTimestamp("updated_at").toInstant();
+      Timestamp fts = rs.getTimestamp("finalized_at");
+      Instant finalizedAt = fts == null ? null : fts.toInstant();
+      Timestamp hb = rs.getTimestamp("heartbeat_at");
+      Instant heartbeatAt = hb == null ? null : hb.toInstant();
+      String executorId = rs.getString("executor_id");
+      Instant expiresAt = rs.getTimestamp("expires_at").toInstant();
+      return new IdempotencyRecord(
+          realmId,
+          idempotencyKey,
+          operationType,
+          resourceId,
+          httpStatus,
+          errorSubtype,
+          responseSummary,
+          responseHeaders,
+          createdAt,
+          updatedAt,
+          finalizedAt,
+          heartbeatAt,
+          executorId,
+          expiresAt);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql 
b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql
index 96897f510..ed99b6120 100644
--- a/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql
+++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql
@@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events (
     additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB,
     PRIMARY KEY (event_id)
 );
+
+-- Idempotency records (key-only idempotency; durable replay)
+CREATE TABLE IF NOT EXISTS idempotency_records (
+    realm_id TEXT NOT NULL,
+    idempotency_key TEXT NOT NULL,
+    operation_type TEXT NOT NULL,
+    resource_id TEXT NOT NULL,
+
+    -- Finalization/replay
+    http_status INTEGER,                 -- NULL while IN_PROGRESS; set only 
on finalized 2xx/terminal 4xx
+    error_subtype TEXT,                  -- optional: e.g., already_exists, 
namespace_not_empty, idempotency_replay_failed
+    response_summary TEXT,               -- minimal body to reproduce 
equivalent response (JSON string)
+    response_headers TEXT,               -- small whitelisted headers to 
replay (JSON string)
+    finalized_at TIMESTAMP,              -- when http_status was written
+
+    -- Liveness/ops
+    created_at TIMESTAMP NOT NULL,
+    updated_at TIMESTAMP NOT NULL,
+    heartbeat_at TIMESTAMP,              -- updated by owner while IN_PROGRESS
+    executor_id TEXT,                    -- owner pod/worker id
+    expires_at TIMESTAMP,
+
+    PRIMARY KEY (realm_id, idempotency_key)
+);
+
+-- Helpful indexes
+CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires
+    ON idempotency_records (realm_id, expires_at);
diff --git 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
new file mode 100644
index 000000000..918c9f72d
--- /dev/null
+++ 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.idempotency;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.InputStream;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import javax.sql.DataSource;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
+import 
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+public class PostgresIdempotencyStoreIT {
+
+  @Container
+  private static final PostgreSQLContainer<?> POSTGRES =
+      new PostgreSQLContainer<>("postgres:17.5-alpine");
+
+  private static DataSource dataSource;
+  private static PostgresIdempotencyStore store;
+
+  @BeforeAll
+  static void setup() throws Exception {
+    POSTGRES.start();
+    PGSimpleDataSource ds = new PGSimpleDataSource();
+    ds.setURL(POSTGRES.getJdbcUrl());
+    ds.setUser(POSTGRES.getUsername());
+    ds.setPassword(POSTGRES.getPassword());
+    dataSource = ds;
+
+    // Apply schema
+    RelationalJdbcConfiguration cfg =
+        new RelationalJdbcConfiguration() {
+          @Override
+          public Optional<Integer> maxRetries() {
+            return Optional.of(3);
+          }
+
+          @Override
+          public Optional<Long> maxDurationInMs() {
+            return Optional.of(5000L);
+          }
+
+          @Override
+          public Optional<Long> initialDelayInMs() {
+            return Optional.of(100L);
+          }
+        };
+    DatasourceOperations ops = new DatasourceOperations(dataSource, cfg);
+    try (InputStream is =
+        Thread.currentThread()
+            .getContextClassLoader()
+            .getResourceAsStream("postgres/schema-v3.sql")) {
+      if (is == null) {
+        throw new IllegalStateException("schema-v3.sql not found on 
classpath");
+      }
+      ops.executeScript(is);
+    }
+
+    store = new PostgresIdempotencyStore(dataSource, cfg);
+  }
+
+  @AfterAll
+  static void teardown() {
+    POSTGRES.stop();
+  }
+
+  @Test
+  void reserveSingleWinnerAndDuplicate() {
+    String realm = "test-realm";
+    String key = "K1";
+    String op = "commit-table";
+    String rid = "tables/ns.tbl";
+    Instant now = Instant.now();
+    Instant exp = now.plus(Duration.ofMinutes(5));
+
+    IdempotencyStore.ReserveResult r1 = store.reserve(realm, key, op, rid, 
exp, "A", now);
+    
assertThat(r1.getType()).isEqualTo(IdempotencyStore.ReserveResultType.OWNED);
+
+    IdempotencyStore.ReserveResult r2 = store.reserve(realm, key, op, rid, 
exp, "B", now);
+    
assertThat(r2.getType()).isEqualTo(IdempotencyStore.ReserveResultType.DUPLICATE);
+    assertThat(r2.getExisting()).isPresent();
+    IdempotencyRecord rec = r2.getExisting().get();
+    assertThat(rec.getRealmId()).isEqualTo(realm);
+    assertThat(rec.getIdempotencyKey()).isEqualTo(key);
+    assertThat(rec.getOperationType()).isEqualTo(op);
+    assertThat(rec.getNormalizedResourceId()).isEqualTo(rid);
+    assertThat(rec.getHttpStatus()).isNull();
+  }
+
+  @Test
+  void heartbeatAndFinalize() {
+    String realm = "test-realm";
+    String key = "K2";
+    String op = "commit-table";
+    String rid = "tables/ns.tbl2";
+    Instant now = Instant.now();
+    Instant exp = now.plus(Duration.ofMinutes(5));
+
+    store.reserve(realm, key, op, rid, exp, "A", now);
+    HeartbeatResult hb = store.updateHeartbeat(realm, key, "A", 
now.plusSeconds(1));
+    assertThat(hb).isEqualTo(HeartbeatResult.UPDATED);
+
+    boolean fin =
+        store.finalizeRecord(
+            realm,
+            key,
+            201,
+            null,
+            "{\"ok\":true}",
+            "{\"Content-Type\":\"application/json\"}",
+            now.plusSeconds(2));
+    assertThat(fin).isTrue();
+
+    // finalize again should be a no-op
+    boolean fin2 =
+        store.finalizeRecord(
+            realm,
+            key,
+            201,
+            null,
+            "{\"ok\":true}",
+            "{\"Content-Type\":\"application/json\"}",
+            now.plusSeconds(3));
+    assertThat(fin2).isFalse();
+
+    Optional<IdempotencyRecord> rec = store.load(realm, key);
+    assertThat(rec).isPresent();
+    assertThat(rec.get().isFinalized()).isTrue();
+    assertThat(rec.get().getHttpStatus()).isEqualTo(201);
+  }
+
+  @Test
+  void purgeExpired() {
+    String realm = "test-realm";
+    String key = "K3";
+    String op = "drop-table";
+    String rid = "tables/ns.tbl3";
+    Instant now = Instant.now();
+    Instant expPast = now.minus(Duration.ofMinutes(1));
+
+    store.reserve(realm, key, op, rid, expPast, "A", now);
+    int purged = store.purgeExpired(realm, Instant.now());
+    assertThat(purged).isEqualTo(1);
+  }
+
+  @Test
+  void duplicateReturnsExistingBindingForMismatch() {
+    String realm = "test-realm";
+    String key = "K4";
+    String op1 = "commit-table";
+    String rid1 = "tables/ns.tbl4";
+    String op2 = "drop-table"; // different binding
+    String rid2 = "tables/ns.tbl4"; // same resource, different op
+    Instant now = Instant.now();
+    Instant exp = now.plus(Duration.ofMinutes(5));
+
+    IdempotencyStore.ReserveResult r1 = store.reserve(realm, key, op1, rid1, 
exp, "A", now);
+    
assertThat(r1.getType()).isEqualTo(IdempotencyStore.ReserveResultType.OWNED);
+
+    // Second reserve with different op/resource should *not* overwrite the 
original binding.
+    // The store must return DUPLICATE with the *original* (op1, rid1); the 
HTTP layer
+    // (IdempotencyFilter)
+    // will detect the mismatch and return 422.
+    IdempotencyStore.ReserveResult r2 = store.reserve(realm, key, op2, rid2, 
exp, "B", now);
+    
assertThat(r2.getType()).isEqualTo(IdempotencyStore.ReserveResultType.DUPLICATE);
+    assertThat(r2.getExisting()).isPresent();
+    IdempotencyRecord rec = r2.getExisting().get();
+    assertThat(rec.getOperationType()).isEqualTo(op1);
+    assertThat(rec.getNormalizedResourceId()).isEqualTo(rid1);
+  }
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
new file mode 100644
index 000000000..cf3e9d98d
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+
+/**
+ * Abstraction for persisting and querying idempotency records.
+ *
+ * <p>An {@link IdempotencyStore} is responsible for:
+ *
+ * <ul>
+ *   <li>Reserving an idempotency key for a particular operation and resource
+ *   <li>Recording completion status and response metadata
+ *   <li>Allowing callers to look up existing records to detect duplicates
+ *   <li>Expiring and purging old reservations
+ * </ul>
+ *
+ * <p>Implementations must be thread-safe if used concurrently.
+ */
+public interface IdempotencyStore {
+
+  /** High-level outcome of attempting to reserve an idempotency key. */
+  enum ReserveResultType {
+    /** The caller successfully acquired ownership of the idempotency key. */
+    OWNED,
+    /** A reservation already exists for the key; the caller does not own it. 
*/
+    DUPLICATE
+  }
+
+  /**
+   * Result of attempting to update the heartbeat for an in-progress 
idempotency record.
+   *
+   * <p>This allows callers to distinguish between different "no update" 
scenarios instead of
+   * overloading them into a boolean.
+   */
+  enum HeartbeatResult {
+    /** The heartbeat was successfully updated for the in-progress record 
owned by this executor. */
+    UPDATED,
+
+    /**
+     * The idempotency record exists but has already been finalized with an 
HTTP status; there is no
+     * longer an in-progress reservation to heartbeat.
+     */
+    FINALIZED,
+
+    /**
+     * No idempotency record exists for the specified realm and key. This can 
happen if the record
+     * was never created or has already been purged.
+     */
+    NOT_FOUND,
+
+    /**
+     * An in-progress idempotency record exists for the key, but it is owned 
by a different
+     * executor. The caller should stop heartbeating as it no longer owns the 
reservation.
+     */
+    LOST_OWNERSHIP
+  }
+
+  /**
+   * Result of a {@link #reserve(String, String, String, String, Instant, 
String, Instant)} call,
+   * including the outcome and, when applicable, the existing idempotency 
record.
+   */
+  final class ReserveResult {
+    private final ReserveResultType type;
+    private final Optional<IdempotencyRecord> existing;
+
+    public ReserveResult(ReserveResultType type, Optional<IdempotencyRecord> 
existing) {
+      this.type = type;
+      this.existing = existing == null ? Optional.empty() : existing;
+    }
+
+    /**
+     * Returns the outcome of the reservation attempt.
+     *
+     * @return the {@link ReserveResultType}
+     */
+    public ReserveResultType getType() {
+      return type;
+    }
+
+    /**
+     * Returns the existing idempotency record when {@link #getType()} is 
{@link
+     * ReserveResultType#DUPLICATE}, otherwise {@link Optional#empty()}.
+     *
+     * @return the existing {@link IdempotencyRecord}, if present
+     */
+    public Optional<IdempotencyRecord> getExisting() {
+      return existing;
+    }
+  }
+
+  /**
+   * Attempts to reserve an idempotency key for a given operation and resource.
+   *
+   * <p>If no record exists yet, the implementation should create a new 
reservation owned by {@code
+   * executorId}. If a record already exists, the implementation should return 
{@link
+   * ReserveResultType#DUPLICATE} along with the existing record.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param operationType logical operation name (e.g., {@code "commit-table"})
+   * @param normalizedResourceId normalized identifier of the affected resource
+   * @param expiresAt timestamp after which the reservation is considered 
expired
+   * @param executorId identifier of the caller attempting the reservation
+   * @param now timestamp representing the current time
+   * @return {@link ReserveResult} describing whether the caller owns the 
reservation or hit a
+   *     duplicate
+   */
+  ReserveResult reserve(
+      String realmId,
+      String idempotencyKey,
+      String operationType,
+      String normalizedResourceId,
+      Instant expiresAt,
+      String executorId,
+      Instant now);
+
+  /**
+   * Loads an existing idempotency record for the given realm and key, if 
present.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @return the corresponding {@link IdempotencyRecord}, if it exists
+   */
+  Optional<IdempotencyRecord> load(String realmId, String idempotencyKey);
+
+  /**
+   * Updates the heartbeat for an in-progress reservation to indicate that the 
executor is still
+   * actively processing.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param executorId identifier of the executor that owns the reservation
+   * @param now timestamp representing the current time
+   * @return {@link HeartbeatResult} describing whether the heartbeat was 
updated or why it was not
+   */
+  HeartbeatResult updateHeartbeat(
+      String realmId, String idempotencyKey, String executorId, Instant now);
+
+  /**
+   * Marks an idempotency record as finalized, recording HTTP status and 
response metadata.
+   *
+   * <p>Implementations should be tolerant of idempotent re-finalization 
attempts and typically
+   * return {@code false} when a record was already finalized.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param httpStatus HTTP status code returned to the client, or {@code 
null} if not applicable
+   * @param errorSubtype optional error subtype or code, if the operation 
failed
+   * @param responseSummary short, serialized representation of the response 
body
+   * @param responseHeaders serialized representation of response headers
+   * @param finalizedAt timestamp when the operation completed
+   * @return {@code true} if the record was transitioned to a finalized state, 
{@code false}
+   *     otherwise
+   */
+  boolean finalizeRecord(
+      String realmId,
+      String idempotencyKey,
+      Integer httpStatus,
+      String errorSubtype,
+      String responseSummary,
+      String responseHeaders,
+      Instant finalizedAt);
+
+  /**
+   * Purges records in a given realm whose expiration time is strictly before 
the given instant.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param before cutoff instant; records expiring before this time may be 
removed
+   * @return number of records that were purged
+   */
+  int purgeExpired(String realmId, Instant before);
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
 
b/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
new file mode 100644
index 000000000..e4cae7ef8
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.idempotency;
+
+import java.time.Instant;
+
+public final class IdempotencyRecord {
+  private final String realmId;
+  private final String idempotencyKey;
+  private final String operationType;
+  private final String normalizedResourceId;
+
+  private final Integer httpStatus;
+  private final String errorSubtype;
+  private final String responseSummary;
+  private final String responseHeaders;
+  private final Instant finalizedAt;
+
+  private final Instant createdAt;
+  private final Instant updatedAt;
+  private final Instant heartbeatAt;
+  private final String executorId;
+  private final Instant expiresAt;
+
+  public IdempotencyRecord(
+      String realmId,
+      String idempotencyKey,
+      String operationType,
+      String normalizedResourceId,
+      Integer httpStatus,
+      String errorSubtype,
+      String responseSummary,
+      String responseHeaders,
+      Instant createdAt,
+      Instant updatedAt,
+      Instant finalizedAt,
+      Instant heartbeatAt,
+      String executorId,
+      Instant expiresAt) {
+    this.realmId = realmId;
+    this.idempotencyKey = idempotencyKey;
+    this.operationType = operationType;
+    this.normalizedResourceId = normalizedResourceId;
+    this.httpStatus = httpStatus;
+    this.errorSubtype = errorSubtype;
+    this.responseSummary = responseSummary;
+    this.responseHeaders = responseHeaders;
+    this.createdAt = createdAt;
+    this.updatedAt = updatedAt;
+    this.finalizedAt = finalizedAt;
+    this.heartbeatAt = heartbeatAt;
+    this.executorId = executorId;
+    this.expiresAt = expiresAt;
+  }
+
+  public String getRealmId() {
+    return realmId;
+  }
+
+  public String getIdempotencyKey() {
+    return idempotencyKey;
+  }
+
+  public String getOperationType() {
+    return operationType;
+  }
+
+  public String getNormalizedResourceId() {
+    return normalizedResourceId;
+  }
+
+  /**
+   * HTTP status code returned to the client for this idempotent operation.
+   *
+   * <p>Remains {@code null} while the record is {@code IN_PROGRESS} and is 
set only when the
+   * operation reaches a terminal 2xx or 4xx state.
+   */
+  public Integer getHttpStatus() {
+    return httpStatus;
+  }
+
+  /**
+   * Optional error subtype or code that provides additional detail when the 
operation failed.
+   *
+   * <p>Examples include {@code already_exists}, {@code namespace_not_empty}, 
or {@code
+   * idempotency_replay_failed}.
+   */
+  public String getErrorSubtype() {
+    return errorSubtype;
+  }
+
+  /**
+   * Minimal serialized representation of the response body used to reproduce 
an equivalent
+   * response.
+   *
+   * <p>This is typically a compact JSON string that contains just enough 
information for the HTTP
+   * layer to reconstruct the response for duplicate idempotent requests.
+   */
+  public String getResponseSummary() {
+    return responseSummary;
+  }
+
+  /**
+   * Serialized representation of a small, whitelisted set of HTTP response 
headers.
+   *
+   * <p>Stored as a JSON string so that the HTTP layer can replay key headers 
(such as {@code
+   * Content-Type}) when serving a duplicate idempotent request.
+   */
+  public String getResponseHeaders() {
+    return responseHeaders;
+  }
+
+  public Instant getCreatedAt() {
+    return createdAt;
+  }
+
+  public Instant getUpdatedAt() {
+    return updatedAt;
+  }
+
+  /**
+   * Timestamp indicating when the record was finalized.
+   *
+   * <p>Set at the same time as {@link #getHttpStatus()} when the operation 
completes; {@code null}
+   * while the record is still {@code IN_PROGRESS}.
+   */
+  public Instant getFinalizedAt() {
+    return finalizedAt;
+  }
+
+  /**
+   * Timestamp of the most recent successful heartbeat while the operation is 
{@code IN_PROGRESS}.
+   *
+   * <p>This is updated by the owning executor to signal liveness and is used 
by reconciliation
+   * logic to detect stuck or abandoned in-progress records.
+   */
+  public Instant getHeartbeatAt() {
+    return heartbeatAt;
+  }
+
+  /**
+   * Identifier of the executor (for example pod or worker id) that currently 
owns the in-progress
+   * reservation.
+   */
+  public String getExecutorId() {
+    return executorId;
+  }
+
+  /** Timestamp after which the reservation is considered expired and eligible 
for purging. */
+  public Instant getExpiresAt() {
+    return expiresAt;
+  }
+
+  public boolean isFinalized() {
+    return httpStatus != null;
+  }
+}


Reply via email to