singhpk234 commented on code in PR #3205: URL: https://github.com/apache/polaris/pull/3205#discussion_r2591664924
########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java: ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Postgres implementation of IdempotencyStore. */ +public final class PostgresIdempotencyStore implements IdempotencyStore { + private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); + + private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; + + private final DatasourceOperations ops; + + public PostgresIdempotencyStore( + @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) + throws SQLException { + this.ops = new DatasourceOperations(dataSource, cfg); + } + + @Override + public ReserveResult reserve( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Instant expiresAt, + String executorId, + Instant now) { + String sql = + "INSERT INTO " + + TABLE + + " (realm_id, idempotency_key, operation_type, resource_id," + + " http_status, error_subtype, response_summary, response_headers, finalized_at," + + " created_at, updated_at, heartbeat_at, executor_id, expires_at)" + + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, ?)" + + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING"; + List<Object> params = + List.of( + realmId, + idempotencyKey, + operationType, + normalizedResourceId, + Timestamp.from(now), + Timestamp.from(now), + Timestamp.from(now), + executorId, + Timestamp.from(expiresAt)); + try { + int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql, params)); + if (updated == 1) { + return new ReserveResult(ReserveResultType.OWNED, Optional.empty()); + } else { + // Load existing to return to caller + return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to reserve idempotency key", e); + } + } + + @Override + public Optional<IdempotencyRecord> load(String realmId, String idempotencyKey) { + String sql = + "SELECT realm_id, idempotency_key, operation_type, resource_id, http_status, error_subtype," + + " response_summary, response_headers, created_at, updated_at, finalized_at, heartbeat_at," + + " executor_id, expires_at" + + " FROM " + + TABLE + + " WHERE realm_id = ? AND idempotency_key = ?"; + try { + final IdempotencyRecord[] holder = new IdempotencyRecord[1]; + ops.executeSelectOverStream( + new QueryGenerator.PreparedQuery(sql, List.of(realmId, idempotencyKey)), + new Converter<IdempotencyRecord>() { + @Override + public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + return convert(rs); + } + + @Override + public Map<String, Object> toMap(DatabaseType databaseType) { + return Map.of(); + } + }, + stream -> stream.findFirst().ifPresent(r -> holder[0] = r)); + return Optional.ofNullable(holder[0]); + } catch (SQLException e) { + throw new RuntimeException("Failed to load idempotency record", e); + } + } + + @Override + public boolean updateHeartbeat( + String realmId, String idempotencyKey, String executorId, Instant now) { + String sql = + "UPDATE " + + TABLE + + " SET heartbeat_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ?" + + " AND http_status IS NULL" + + " AND (executor_id IS NULL OR executor_id = ?)"; + try { + int rows = + ops.executeUpdate( + new QueryGenerator.PreparedQuery( + sql, + List.of( + Timestamp.from(now), + Timestamp.from(now), + realmId, + idempotencyKey, + executorId))); + return rows > 0; + } catch (SQLException e) { + throw new RuntimeException("Failed to update heartbeat", e); + } + } + + @Override + public boolean finalizeRecord( + String realmId, + String idempotencyKey, + Integer httpStatus, + String errorSubtype, + String responseSummary, + String responseHeaders, + Instant finalizedAt) { + String sql = + "UPDATE " + + TABLE + + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," + + " finalized_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; + try { + int rows = + ops.executeUpdate( + new QueryGenerator.PreparedQuery( + sql, + Arrays.asList( + httpStatus, + errorSubtype, + responseSummary, + responseHeaders, + Timestamp.from(finalizedAt), + Timestamp.from(finalizedAt), + realmId, + idempotencyKey))); + return rows > 0; + } catch (SQLException e) { + throw new RuntimeException("Failed to finalize idempotency record", e); + } + } + + @Override + public int purgeExpired(Instant before) { + String sql = "DELETE FROM " + TABLE + " WHERE expires_at < ?"; + try { + return ops.executeUpdate( + new QueryGenerator.PreparedQuery(sql, List.of(Timestamp.from(before)))); + } catch (SQLException e) { + throw new RuntimeException("Failed to purge expired idempotency records", e); + } + } + + private static IdempotencyRecord convert(ResultSet rs) { + try { + String realmId = rs.getString("realm_id"); + String idempotencyKey = rs.getString("idempotency_key"); + String operationType = rs.getString("operation_type"); + String resourceId = rs.getString("resource_id"); + Integer httpStatus = (Integer) rs.getObject("http_status"); + String errorSubtype = rs.getString("error_subtype"); + String responseSummary = rs.getString("response_summary"); + String responseHeaders = rs.getString("response_headers"); + Instant createdAt = rs.getTimestamp("created_at").toInstant(); + Instant updatedAt = rs.getTimestamp("updated_at").toInstant(); + Timestamp fts = rs.getTimestamp("finalized_at"); + Instant finalizedAt = fts == null ? null : fts.toInstant(); + Timestamp hb = rs.getTimestamp("heartbeat_at"); + Instant heartbeatAt = hb == null ? null : hb.toInstant(); + String executorId = rs.getString("executor_id"); + Instant expiresAt = rs.getTimestamp("expires_at").toInstant(); Review Comment: can we define model for this ? we have convertors functions for this ########## persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql: ########## @@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events ( additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, PRIMARY KEY (event_id) ); + +-- Idempotency records (key-only idempotency; durable replay) +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + + -- Finalization/replay + http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx + error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed + response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string) + response_headers TEXT, -- small whitelisted headers to replay (JSON string) + finalized_at TIMESTAMP, -- when http_status was written + + -- Liveness/ops + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS + executor_id TEXT, -- owner pod/worker id + expires_at TIMESTAMP, + + PRIMARY KEY (realm_id, idempotency_key) +); + +-- Helpful indexes +CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records (expires_at); Review Comment: V3 is already release, we would need v4 now ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Postgres implementation of IdempotencyStore. */ +public final class PostgresIdempotencyStore implements IdempotencyStore { + private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); + + private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; + + private final DatasourceOperations ops; + + public PostgresIdempotencyStore( + @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) + throws SQLException { + this.ops = new DatasourceOperations(dataSource, cfg); Review Comment: I am not sure if reusing the same datasource where entities are stored is a good idea, can we move idempotency to a different datasource, do we have estimates on how the writes performance regress with this additional db call ? ########## persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql: ########## @@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events ( additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, PRIMARY KEY (event_id) ); + +-- Idempotency records (key-only idempotency; durable replay) +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + + -- Finalization/replay + http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx + error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed + response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string) + response_headers TEXT, -- small whitelisted headers to replay (JSON string) + finalized_at TIMESTAMP, -- when http_status was written + + -- Liveness/ops + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS + executor_id TEXT, -- owner pod/worker id + expires_at TIMESTAMP, + + PRIMARY KEY (realm_id, idempotency_key) +); + +-- Helpful indexes +CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records (expires_at); +CREATE INDEX IF NOT EXISTS idx_idemp_active ON idempotency_records (http_status, heartbeat_at); Review Comment: don;t we need realm_id in these index ? ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Postgres implementation of IdempotencyStore. */ +public final class PostgresIdempotencyStore implements IdempotencyStore { Review Comment: why does this has to be Postgres specific ? ########## persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.InputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class PostgresIdempotencyStoreIT { Review Comment: THis should be database agnostic and the profile can be database specific. ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Postgres implementation of IdempotencyStore. */ +public final class PostgresIdempotencyStore implements IdempotencyStore { + private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); + + private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; + + private final DatasourceOperations ops; + + public PostgresIdempotencyStore( + @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) + throws SQLException { + this.ops = new DatasourceOperations(dataSource, cfg); + } + + @Override + public ReserveResult reserve( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Instant expiresAt, + String executorId, + Instant now) { + String sql = + "INSERT INTO " + + TABLE + + " (realm_id, idempotency_key, operation_type, resource_id," + + " http_status, error_subtype, response_summary, response_headers, finalized_at," + + " created_at, updated_at, heartbeat_at, executor_id, expires_at)" + + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, ?)" + + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING"; + List<Object> params = + List.of( + realmId, + idempotencyKey, + operationType, + normalizedResourceId, + Timestamp.from(now), + Timestamp.from(now), + Timestamp.from(now), + executorId, + Timestamp.from(expiresAt)); + try { + int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql, params)); + if (updated == 1) { + return new ReserveResult(ReserveResultType.OWNED, Optional.empty()); + } else { + // Load existing to return to caller + return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to reserve idempotency key", e); + } + } + + @Override + public Optional<IdempotencyRecord> load(String realmId, String idempotencyKey) { + String sql = + "SELECT realm_id, idempotency_key, operation_type, resource_id, http_status, error_subtype," + + " response_summary, response_headers, created_at, updated_at, finalized_at, heartbeat_at," + + " executor_id, expires_at" + + " FROM " + + TABLE + + " WHERE realm_id = ? AND idempotency_key = ?"; + try { + final IdempotencyRecord[] holder = new IdempotencyRecord[1]; + ops.executeSelectOverStream( + new QueryGenerator.PreparedQuery(sql, List.of(realmId, idempotencyKey)), + new Converter<IdempotencyRecord>() { + @Override + public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + return convert(rs); + } + + @Override + public Map<String, Object> toMap(DatabaseType databaseType) { + return Map.of(); + } + }, + stream -> stream.findFirst().ifPresent(r -> holder[0] = r)); + return Optional.ofNullable(holder[0]); + } catch (SQLException e) { + throw new RuntimeException("Failed to load idempotency record", e); + } + } + + @Override + public HeartbeatResult updateHeartbeat( + String realmId, String idempotencyKey, String executorId, Instant now) { + String sql = + "UPDATE " + + TABLE + + " SET heartbeat_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ?" + + " AND http_status IS NULL" + + " AND (executor_id IS NULL OR executor_id = ?)"; + try { + int rows = + ops.executeUpdate( + new QueryGenerator.PreparedQuery( + sql, + List.of( + Timestamp.from(now), + Timestamp.from(now), + realmId, + idempotencyKey, + executorId))); + if (rows > 0) { + return HeartbeatResult.UPDATED; + } + + // No rows updated: determine why by loading the current record, if any. + // TODO: consider using a single atomic read/write (for example, PostgreSQL + // UPDATE ... RETURNING) to avoid this follow-up lookup and make the + // conflicting state observable in the same operation. + Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey); + if (existing.isEmpty()) { + return HeartbeatResult.NOT_FOUND; + } + + IdempotencyRecord record = existing.get(); + if (record.getHttpStatus() != null) { + return HeartbeatResult.FINALIZED; + } + + // Record is still IN_PROGRESS but owned by a different executor. + return HeartbeatResult.LOST_OWNERSHIP; + } catch (SQLException e) { + throw new RuntimeException("Failed to update heartbeat", e); + } + } + + @Override + public boolean finalizeRecord( + String realmId, + String idempotencyKey, + Integer httpStatus, + String errorSubtype, + String responseSummary, + String responseHeaders, + Instant finalizedAt) { + String sql = + "UPDATE " + + TABLE + + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," + + " finalized_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; + try { + int rows = + ops.executeUpdate( + new QueryGenerator.PreparedQuery( + sql, + Arrays.asList( + httpStatus, + errorSubtype, + responseSummary, + responseHeaders, + Timestamp.from(finalizedAt), + Timestamp.from(finalizedAt), + realmId, + idempotencyKey))); + return rows > 0; + } catch (SQLException e) { + throw new RuntimeException("Failed to finalize idempotency record", e); + } + } + + @Override + public int purgeExpired(String realmId, Instant before) { + String sql = "DELETE FROM " + TABLE + " WHERE realm_id = ? AND expires_at < ?"; + try { + return ops.executeUpdate( + new QueryGenerator.PreparedQuery(sql, List.of(realmId, Timestamp.from(before)))); + } catch (SQLException e) { + throw new RuntimeException("Failed to purge expired idempotency records", e); + } + } + + private static IdempotencyRecord convert(ResultSet rs) { Review Comment: can IdempotencyRecord not be a model ? like we have existing model and we just use Convertor abstraction ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Postgres implementation of IdempotencyStore. */ +public final class PostgresIdempotencyStore implements IdempotencyStore { + private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); + + private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; Review Comment: can we use QueryGenerator method to infer table name from model instead of hardcoding it ? ########## persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql: ########## @@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events ( additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, PRIMARY KEY (event_id) ); + +-- Idempotency records (key-only idempotency; durable replay) +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + + -- Finalization/replay + http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx + error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed + response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string) + response_headers TEXT, -- small whitelisted headers to replay (JSON string) + finalized_at TIMESTAMP, -- when http_status was written + + -- Liveness/ops + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS + executor_id TEXT, -- owner pod/worker id + expires_at TIMESTAMP, + + PRIMARY KEY (realm_id, idempotency_key) +); + +-- Helpful indexes +CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires Review Comment: This will cause already bootstrapped version with V3 to fail, we should keep this in v4 ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java: ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Postgres implementation of IdempotencyStore. */ +public final class PostgresIdempotencyStore implements IdempotencyStore { + private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); + + private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; + + private final DatasourceOperations ops; + + public PostgresIdempotencyStore( + @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) + throws SQLException { + this.ops = new DatasourceOperations(dataSource, cfg); + } + + @Override + public ReserveResult reserve( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Instant expiresAt, + String executorId, + Instant now) { + String sql = + "INSERT INTO " + + TABLE + + " (realm_id, idempotency_key, operation_type, resource_id," + + " http_status, error_subtype, response_summary, response_headers, finalized_at," + + " created_at, updated_at, heartbeat_at, executor_id, expires_at)" + + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, ?)" + + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING"; + List<Object> params = + List.of( + realmId, + idempotencyKey, + operationType, + normalizedResourceId, + Timestamp.from(now), + Timestamp.from(now), + Timestamp.from(now), + executorId, + Timestamp.from(expiresAt)); + try { + int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql, params)); + if (updated == 1) { + return new ReserveResult(ReserveResultType.OWNED, Optional.empty()); + } else { + // Load existing to return to caller + return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to reserve idempotency key", e); + } + } + + @Override + public Optional<IdempotencyRecord> load(String realmId, String idempotencyKey) { + String sql = + "SELECT realm_id, idempotency_key, operation_type, resource_id, http_status, error_subtype," + + " response_summary, response_headers, created_at, updated_at, finalized_at, heartbeat_at," + + " executor_id, expires_at" + + " FROM " + + TABLE + + " WHERE realm_id = ? AND idempotency_key = ?"; + try { + final IdempotencyRecord[] holder = new IdempotencyRecord[1]; + ops.executeSelectOverStream( + new QueryGenerator.PreparedQuery(sql, List.of(realmId, idempotencyKey)), + new Converter<IdempotencyRecord>() { + @Override + public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + return convert(rs); + } + + @Override + public Map<String, Object> toMap(DatabaseType databaseType) { + return Map.of(); + } + }, + stream -> stream.findFirst().ifPresent(r -> holder[0] = r)); + return Optional.ofNullable(holder[0]); + } catch (SQLException e) { + throw new RuntimeException("Failed to load idempotency record", e); + } + } + + @Override + public boolean updateHeartbeat( + String realmId, String idempotencyKey, String executorId, Instant now) { + String sql = + "UPDATE " + + TABLE + + " SET heartbeat_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ?" + + " AND http_status IS NULL" + + " AND (executor_id IS NULL OR executor_id = ?)"; + try { + int rows = + ops.executeUpdate( + new QueryGenerator.PreparedQuery( + sql, + List.of( + Timestamp.from(now), + Timestamp.from(now), + realmId, + idempotencyKey, + executorId))); + return rows > 0; + } catch (SQLException e) { + throw new RuntimeException("Failed to update heartbeat", e); + } + } + + @Override + public boolean finalizeRecord( + String realmId, + String idempotencyKey, + Integer httpStatus, + String errorSubtype, + String responseSummary, + String responseHeaders, + Instant finalizedAt) { + String sql = + "UPDATE " + + TABLE + + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," + + " finalized_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; Review Comment: i wonder if we can leverage query generator for this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
