This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f5768738e Add idempotency adapter and E2E coverage (#14773)
4f5768738e is described below

commit 4f5768738e79487058fd65fba3faa19612a53391
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Jan 16 09:52:55 2026 -0800

    Add idempotency adapter and E2E coverage (#14773)
    
    * rebase
    
    * rebase to upstream
    
    * address comments
    
    * address comments
    
    * add back idempotency tests
    
    * address comments
---
 .../org/apache/iceberg/rest/CatalogHandlers.java   | 151 +++++++++
 .../apache/iceberg/rest/RESTCatalogAdapter.java    | 126 ++++---
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 364 +++++++++++++++++++--
 3 files changed, 573 insertions(+), 68 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java 
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index 229497576a..18de8493f4 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -24,6 +24,8 @@ import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
 import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.util.Collections;
@@ -33,10 +35,14 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
 import org.apache.iceberg.BaseMetadataTable;
@@ -66,6 +72,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NoSuchViewException;
 import org.apache.iceberg.io.CloseableIterable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -108,8 +115,152 @@ public class CatalogHandlers {
       InMemoryPlanningState.getInstance();
   private static final ExecutorService ASYNC_PLANNING_POOL = 
Executors.newSingleThreadExecutor();
 
+  // Advanced idempotency store with TTL and in-flight coalescing.
+  //
+  // Note: This is a simple in-memory implementation meant for tests and 
lightweight usage.
+  // Production servers should provide a durable store.
+  private static final ConcurrentMap<String, IdempotencyEntry> 
IDEMPOTENCY_STORE =
+      Maps.newConcurrentMap();
+  private static volatile long idempotencyLifetimeMillis = 
TimeUnit.MINUTES.toMillis(30);
+
   private CatalogHandlers() {}
 
+  @SuppressWarnings("unchecked")
+  static <T extends RESTResponse> T withIdempotency(HTTPRequest httpRequest, 
Supplier<T> action) {
+    return withIdempotencyInternal(httpRequest, action);
+  }
+
+  static void withIdempotency(HTTPRequest httpRequest, Runnable action) {
+    withIdempotencyInternal(
+        httpRequest,
+        () -> {
+          action.run();
+          return Boolean.TRUE;
+        });
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T withIdempotencyInternal(HTTPRequest httpRequest, 
Supplier<T> action) {
+    Optional<HTTPHeaders.HTTPHeader> keyHeader =
+        httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+    if (keyHeader.isEmpty()) {
+      return action.get();
+    }
+
+    String key = keyHeader.get().value();
+
+    // The "first" request for this Idempotency-Key is the one that wins
+    // IDEMPOTENCY_STORE.compute(...)
+    // and creates (or replaces) the IN_PROGRESS entry. Only that request 
executes the action and
+    // finalizes the entry; concurrent requests for the same key wait on the 
latch and then replay
+    // the finalized result/error.
+    AtomicBoolean isFirst = new AtomicBoolean(false);
+    IdempotencyEntry entry =
+        IDEMPOTENCY_STORE.compute(
+            key,
+            (k, current) -> {
+              if (current == null || current.isExpired()) {
+                isFirst.set(true);
+                return IdempotencyEntry.inProgress();
+              }
+              return current;
+            });
+
+    // Fast-path: already finalized (another request completed earlier)
+    if (entry.status == IdempotencyEntry.Status.FINALIZED) {
+      if (entry.error != null) {
+        throw entry.error;
+      }
+      return (T) entry.responseBody;
+    }
+
+    if (!isFirst.get()) {
+      // In-flight coalescing: wait for the first request to finalize
+      entry.awaitFinalization();
+      if (entry.error != null) {
+        throw entry.error;
+      }
+      return (T) entry.responseBody;
+    }
+
+    // First request: execute the action and finalize the entry
+    try {
+      T res = action.get();
+      entry.finalizeSuccess(res);
+      return res;
+    } catch (RuntimeException e) {
+      entry.finalizeError(e);
+      throw e;
+    }
+  }
+
+  @VisibleForTesting
+  static void setIdempotencyLifetimeFromIso(String isoDuration) {
+    if (isoDuration == null) {
+      return;
+    }
+    try {
+      idempotencyLifetimeMillis = Duration.parse(isoDuration).toMillis();
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Invalid idempotency lifetime: " + 
isoDuration, e);
+    }
+  }
+
+  private static final class IdempotencyEntry {
+    enum Status {
+      IN_PROGRESS,
+      FINALIZED
+    }
+
+    private final CountDownLatch latch;
+    private final long firstSeenMillis;
+    private volatile Status status;
+    private volatile Object responseBody;
+    private volatile RuntimeException error;
+
+    private IdempotencyEntry(Status status) {
+      this.status = status;
+      this.latch = new CountDownLatch(1);
+      this.firstSeenMillis = System.currentTimeMillis();
+    }
+
+    static IdempotencyEntry inProgress() {
+      return new IdempotencyEntry(Status.IN_PROGRESS);
+    }
+
+    void finalizeSuccess(Object body) {
+      this.responseBody = body;
+      this.status = Status.FINALIZED;
+      this.latch.countDown();
+    }
+
+    void finalizeError(RuntimeException cause) {
+      this.error = cause;
+      this.status = Status.FINALIZED;
+      this.latch.countDown();
+    }
+
+    void awaitFinalization() {
+      try {
+        this.latch.await();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            "Interrupted while waiting for idempotent request to complete", 
ie);
+      }
+    }
+
+    boolean isExpired() {
+      if (this.status != Status.FINALIZED) {
+        return false;
+      }
+
+      Instant expiry =
+          
Instant.ofEpochMilli(this.firstSeenMillis).plusMillis(idempotencyLifetimeMillis);
+      return Instant.now().isAfter(expiry);
+    }
+  }
+
   /**
    * Exception used to avoid retrying commits when assertions fail.
    *
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java 
b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index e62937b6df..0600ef5515 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -207,8 +207,11 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
       case CREATE_NAMESPACE:
         if (asNamespaceCatalog != null) {
           CreateNamespaceRequest request = 
castRequest(CreateNamespaceRequest.class, body);
-          return castResponse(
-              responseType, 
CatalogHandlers.createNamespace(asNamespaceCatalog, request));
+          return CatalogHandlers.withIdempotency(
+              httpRequest,
+              () ->
+                  castResponse(
+                      responseType, 
CatalogHandlers.createNamespace(asNamespaceCatalog, request)));
         }
         break;
 
@@ -229,7 +232,9 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
 
       case DROP_NAMESPACE:
         if (asNamespaceCatalog != null) {
-          CatalogHandlers.dropNamespace(asNamespaceCatalog, 
namespaceFromPathVars(vars));
+          CatalogHandlers.withIdempotency(
+              httpRequest,
+              () -> CatalogHandlers.dropNamespace(asNamespaceCatalog, 
namespaceFromPathVars(vars)));
           return null;
         }
         break;
@@ -239,9 +244,13 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
           Namespace namespace = namespaceFromPathVars(vars);
           UpdateNamespacePropertiesRequest request =
               castRequest(UpdateNamespacePropertiesRequest.class, body);
-          return castResponse(
-              responseType,
-              CatalogHandlers.updateNamespaceProperties(asNamespaceCatalog, 
namespace, request));
+          return CatalogHandlers.withIdempotency(
+              httpRequest,
+              () ->
+                  castResponse(
+                      responseType,
+                      CatalogHandlers.updateNamespaceProperties(
+                          asNamespaceCatalog, namespace, request)));
         }
         break;
 
@@ -268,19 +277,29 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
             return castResponse(
                 responseType, CatalogHandlers.stageTableCreate(catalog, 
namespace, request));
           } else {
-            LoadTableResponse response = CatalogHandlers.createTable(catalog, 
namespace, request);
-            responseHeaders.accept(
-                ImmutableMap.of(HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
-            return castResponse(responseType, response);
+            return CatalogHandlers.withIdempotency(
+                httpRequest,
+                () -> {
+                  LoadTableResponse response =
+                      CatalogHandlers.createTable(catalog, namespace, request);
+                  responseHeaders.accept(
+                      ImmutableMap.of(
+                          HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
+                  return castResponse(responseType, response);
+                });
           }
         }
 
       case DROP_TABLE:
         {
           if (PropertyUtil.propertyAsBoolean(vars, "purgeRequested", false)) {
-            CatalogHandlers.purgeTable(catalog, tableIdentFromPathVars(vars));
+            CatalogHandlers.withIdempotency(
+                httpRequest,
+                () -> CatalogHandlers.purgeTable(catalog, 
tableIdentFromPathVars(vars)));
           } else {
-            CatalogHandlers.dropTable(catalog, tableIdentFromPathVars(vars));
+            CatalogHandlers.withIdempotency(
+                httpRequest,
+                () -> CatalogHandlers.dropTable(catalog, 
tableIdentFromPathVars(vars)));
           }
           return null;
         }
@@ -352,36 +371,47 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
 
       case REGISTER_TABLE:
         {
-          LoadTableResponse response =
-              CatalogHandlers.registerTable(
-                  catalog,
-                  namespaceFromPathVars(vars),
-                  castRequest(RegisterTableRequest.class, body));
-
-          responseHeaders.accept(
-              ImmutableMap.of(HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
-
-          return castResponse(responseType, response);
+          return CatalogHandlers.withIdempotency(
+              httpRequest,
+              () -> {
+                LoadTableResponse response =
+                    CatalogHandlers.registerTable(
+                        catalog,
+                        namespaceFromPathVars(vars),
+                        castRequest(RegisterTableRequest.class, body));
+
+                responseHeaders.accept(
+                    ImmutableMap.of(
+                        HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
+
+                return castResponse(responseType, response);
+              });
         }
 
       case UPDATE_TABLE:
         {
-          LoadTableResponse response =
-              CatalogHandlers.updateTable(
-                  catalog,
-                  tableIdentFromPathVars(vars),
-                  castRequest(UpdateTableRequest.class, body));
-
-          responseHeaders.accept(
-              ImmutableMap.of(HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
-
-          return castResponse(responseType, response);
+          return CatalogHandlers.withIdempotency(
+              httpRequest,
+              () -> {
+                LoadTableResponse response =
+                    CatalogHandlers.updateTable(
+                        catalog,
+                        tableIdentFromPathVars(vars),
+                        castRequest(UpdateTableRequest.class, body));
+
+                responseHeaders.accept(
+                    ImmutableMap.of(
+                        HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
+
+                return castResponse(responseType, response);
+              });
         }
 
       case RENAME_TABLE:
         {
           RenameTableRequest request = castRequest(RenameTableRequest.class, 
body);
-          CatalogHandlers.renameTable(catalog, request);
+          CatalogHandlers.withIdempotency(
+              httpRequest, () -> CatalogHandlers.renameTable(catalog, 
request));
           return null;
         }
 
@@ -395,7 +425,7 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
       case COMMIT_TRANSACTION:
         {
           CommitTransactionRequest request = 
castRequest(CommitTransactionRequest.class, body);
-          commitTransaction(catalog, request);
+          CatalogHandlers.withIdempotency(httpRequest, () -> 
commitTransaction(catalog, request));
           return null;
         }
 
@@ -422,8 +452,12 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
           if (null != asViewCatalog) {
             Namespace namespace = namespaceFromPathVars(vars);
             CreateViewRequest request = castRequest(CreateViewRequest.class, 
body);
-            return castResponse(
-                responseType, CatalogHandlers.createView(asViewCatalog, 
namespace, request));
+            return CatalogHandlers.withIdempotency(
+                httpRequest,
+                () ->
+                    castResponse(
+                        responseType,
+                        CatalogHandlers.createView(asViewCatalog, namespace, 
request)));
           }
           break;
         }
@@ -451,8 +485,11 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
           if (null != asViewCatalog) {
             TableIdentifier ident = viewIdentFromPathVars(vars);
             UpdateTableRequest request = castRequest(UpdateTableRequest.class, 
body);
-            return castResponse(
-                responseType, CatalogHandlers.updateView(asViewCatalog, ident, 
request));
+            return CatalogHandlers.withIdempotency(
+                httpRequest,
+                () ->
+                    castResponse(
+                        responseType, 
CatalogHandlers.updateView(asViewCatalog, ident, request)));
           }
           break;
         }
@@ -461,7 +498,8 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
         {
           if (null != asViewCatalog) {
             RenameTableRequest request = castRequest(RenameTableRequest.class, 
body);
-            CatalogHandlers.renameView(asViewCatalog, request);
+            CatalogHandlers.withIdempotency(
+                httpRequest, () -> CatalogHandlers.renameView(asViewCatalog, 
request));
             return null;
           }
           break;
@@ -470,7 +508,9 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
       case DROP_VIEW:
         {
           if (null != asViewCatalog) {
-            CatalogHandlers.dropView(asViewCatalog, 
viewIdentFromPathVars(vars));
+            CatalogHandlers.withIdempotency(
+                httpRequest,
+                () -> CatalogHandlers.dropView(asViewCatalog, 
viewIdentFromPathVars(vars)));
             return null;
           }
           break;
@@ -568,8 +608,10 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
         vars.putAll(request.queryParameters());
         vars.putAll(routeAndVars.second());
 
-        return handleRequest(
-            routeAndVars.first(), vars.build(), request, responseType, 
responseHeaders);
+        T resp =
+            handleRequest(
+                routeAndVars.first(), vars.build(), request, responseType, 
responseHeaders);
+        return resp;
       } catch (RuntimeException e) {
         configureResponseFromException(e, errorBuilder);
       }
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index d202680e56..40dc050311 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.rest;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
 import static org.assertj.core.api.InstanceOfAssertFactories.map;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyMap;
@@ -69,28 +70,36 @@ import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.UpdatePartitionSpec;
 import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SessionCatalog;
 import org.apache.iceberg.catalog.TableCommit;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
 import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RESTException;
 import org.apache.iceberg.exceptions.ServiceFailureException;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.inmemory.InMemoryCatalog;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
 import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
+import org.apache.iceberg.rest.auth.AuthManager;
+import org.apache.iceberg.rest.auth.AuthManagers;
+import org.apache.iceberg.rest.auth.AuthSession;
 import org.apache.iceberg.rest.auth.AuthSessionUtil;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
 import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ConfigResponse;
 import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
@@ -100,6 +109,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.iceberg.rest.responses.OAuthTokenResponse;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
 import org.assertj.core.api.InstanceOfAssertFactories;
 import org.awaitility.Awaitility;
 import org.eclipse.jetty.server.Server;
@@ -124,12 +134,134 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
               RESTCatalogProperties.NAMESPACE_SEPARATOR,
               RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8));
 
+  private static final class IdempotentEnv {
+    private final TableIdentifier ident;
+    private final RESTClient http;
+    private final Map<String, String> headers;
+
+    private IdempotentEnv(TableIdentifier ident, RESTClient http, Map<String, 
String> headers) {
+      this.ident = ident;
+      this.http = http;
+      this.headers = headers;
+    }
+  }
+
+  /**
+   * Test-only adapter that keeps request/response round-trip serialization 
and header validation
+   * from the base test setup, while also allowing specific tests to inject 
transient failures.
+   */
+  private static class HeaderValidatingAdapter extends RESTCatalogAdapter {
+    private final HTTPHeaders catalogHeaders;
+    private final HTTPHeaders contextHeaders;
+    private final java.util.concurrent.ConcurrentMap<String, RuntimeException>
+        simulateFailureOnFirstSuccessByKey = new 
java.util.concurrent.ConcurrentHashMap<>();
+
+    HeaderValidatingAdapter(
+        Catalog catalog, HTTPHeaders catalogHeaders, HTTPHeaders 
contextHeaders) {
+      super(catalog);
+      this.catalogHeaders = catalogHeaders;
+      this.contextHeaders = contextHeaders;
+    }
+
+    /**
+     * Test helper to simulate a transient failure after the first successful 
mutation for a key.
+     *
+     * <p>Useful to validate that idempotency correctly replays a finalized 
result when the client
+     * retries after a post-success transient failure.
+     */
+    public void simulateFailureOnFirstSuccessForKey(String key, 
RuntimeException failure) {
+      Preconditions.checkArgument(key != null, "Invalid idempotency key: 
null");
+      Preconditions.checkArgument(failure != null, "Invalid failure: null");
+      simulateFailureOnFirstSuccessByKey.put(key, failure);
+    }
+
+    /** Test helper to simulate a transient 503 after the first successful 
mutation for a key. */
+    public void simulate503OnFirstSuccessForKey(String key) {
+      simulateFailureOnFirstSuccessForKey(
+          key,
+          new CommitStateUnknownException(
+              new RuntimeException("simulated transient 503 after success")));
+    }
+
+    @Override
+    public <T extends RESTResponse> T execute(
+        HTTPRequest request,
+        Class<T> responseType,
+        Consumer<ErrorResponse> errorHandler,
+        Consumer<Map<String, String>> responseHeaders) {
+      if (!ResourcePaths.tokens().equals(request.path())) {
+        if (ResourcePaths.config().equals(request.path())) {
+          
assertThat(request.headers().entries()).containsAll(catalogHeaders.entries());
+        } else {
+          
assertThat(request.headers().entries()).containsAll(contextHeaders.entries());
+        }
+      }
+
+      Object body = roundTripSerialize(request.body(), "request");
+      HTTPRequest req = 
ImmutableHTTPRequest.builder().from(request).body(body).build();
+      T response = super.execute(req, responseType, errorHandler, 
responseHeaders);
+      return roundTripSerialize(response, "response");
+    }
+
+    @Override
+    protected <T extends RESTResponse> T execute(
+        HTTPRequest request,
+        Class<T> responseType,
+        Consumer<ErrorResponse> errorHandler,
+        Consumer<Map<String, String>> responseHeaders,
+        ParserContext parserContext) {
+      ErrorResponse.Builder errorBuilder = ErrorResponse.builder();
+      Pair<Route, Map<String, String>> routeAndVars = 
Route.from(request.method(), request.path());
+      if (routeAndVars != null) {
+        try {
+          ImmutableMap.Builder<String, String> vars = ImmutableMap.builder();
+          vars.putAll(request.queryParameters());
+          vars.putAll(routeAndVars.second());
+
+          T resp =
+              handleRequest(
+                  routeAndVars.first(), vars.build(), request, responseType, 
responseHeaders);
+
+          // For tests: simulate a transient 503 after the first successful 
mutation for a key.
+          Optional<HTTPHeaders.HTTPHeader> keyHeader =
+              request.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+          boolean isMutation =
+              request.method() == HTTPMethod.POST || request.method() == 
HTTPMethod.DELETE;
+          if (isMutation && keyHeader.isPresent()) {
+            String key = keyHeader.get().value();
+            RuntimeException failure = 
simulateFailureOnFirstSuccessByKey.remove(key);
+            if (failure != null) {
+              throw failure;
+            }
+          }
+
+          return resp;
+        } catch (RuntimeException e) {
+          configureResponseFromException(e, errorBuilder);
+        }
+
+      } else {
+        errorBuilder
+            .responseCode(400)
+            .withType("BadRequestException")
+            .withMessage(
+                String.format("No route for request: %s %s", request.method(), 
request.path()));
+      }
+
+      ErrorResponse error = errorBuilder.build();
+      errorHandler.accept(error);
+
+      // if the error handler doesn't throw an exception, throw a generic one
+      throw new RESTException("Unhandled error: %s", error);
+    }
+  }
+
   @TempDir public Path temp;
 
   private RESTCatalog restCatalog;
   private InMemoryCatalog backendCatalog;
   private Server httpServer;
-  private RESTCatalogAdapter adapterForRESTServer;
+  private HeaderValidatingAdapter adapterForRESTServer;
 
   @BeforeEach
   public void createCatalog() throws Exception {
@@ -156,31 +288,7 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
                 "test-value"));
 
     adapterForRESTServer =
-        Mockito.spy(
-            new RESTCatalogAdapter(backendCatalog) {
-              @Override
-              public <T extends RESTResponse> T execute(
-                  HTTPRequest request,
-                  Class<T> responseType,
-                  Consumer<ErrorResponse> errorHandler,
-                  Consumer<Map<String, String>> responseHeaders) {
-                // this doesn't use a Mockito spy because this is used for 
catalog tests, which have
-                // different method calls
-                if (!ResourcePaths.tokens().equals(request.path())) {
-                  if (ResourcePaths.config().equals(request.path())) {
-                    
assertThat(request.headers().entries()).containsAll(catalogHeaders.entries());
-                  } else {
-                    
assertThat(request.headers().entries()).containsAll(contextHeaders.entries());
-                  }
-                }
-
-                Object body = roundTripSerialize(request.body(), "request");
-                HTTPRequest req = 
ImmutableHTTPRequest.builder().from(request).body(body).build();
-                T response = super.execute(req, responseType, errorHandler, 
responseHeaders);
-                T responseAfterSerialization = roundTripSerialize(response, 
"response");
-                return responseAfterSerialization;
-              }
-            });
+        Mockito.spy(new HeaderValidatingAdapter(backendCatalog, 
catalogHeaders, contextHeaders));
 
     ServletContextHandler servletContext =
         new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
@@ -3313,6 +3421,136 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
     local.dropTable(ident);
   }
 
+  @Test
+  public void testIdempotentDuplicateCreateReturnsCached() {
+    String key = "dup-create-key";
+    Namespace ns = Namespace.of("ns_dup");
+    IdempotentEnv env = idempotentEnv(key, ns, "t_dup");
+    CreateTableRequest req = createReq(env.ident);
+
+    // First create succeeds
+    LoadTableResponse first =
+        env.http.post(
+            ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+            req,
+            LoadTableResponse.class,
+            env.headers,
+            ErrorHandlers.tableErrorHandler());
+    assertThat(first).isNotNull();
+
+    // Verify request shape (method, path, headers including Idempotency-Key)
+    verifyCreatePost(ns, env.headers);
+
+    // Duplicate with same key returns cached 200 OK
+    LoadTableResponse second =
+        env.http.post(
+            ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+            req,
+            LoadTableResponse.class,
+            env.headers,
+            ErrorHandlers.tableErrorHandler());
+    assertThat(second).isNotNull();
+  }
+
+  @Test
+  public void testIdempotencyKeyLifetimeExpiredTreatsAsNew() {
+    // Set TTL to 0 so cached success expires immediately
+    CatalogHandlers.setIdempotencyLifetimeFromIso("PT0S");
+    try {
+      String key = "expired-create-key";
+      Namespace ns = Namespace.of("ns_exp");
+      IdempotentEnv env = idempotentEnv(key, ns, "t_exp");
+      CreateTableRequest req = createReq(env.ident);
+
+      // First create succeeds
+      LoadTableResponse created =
+          env.http.post(
+              ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+              req,
+              LoadTableResponse.class,
+              env.headers,
+              ErrorHandlers.tableErrorHandler());
+      assertThat(created).isNotNull();
+
+      // Verify request shape (method, path, headers including Idempotency-Key)
+      verifyCreatePost(ns, env.headers);
+
+      // TTL expired -> duplicate with same key should be treated as new and 
fail with AlreadyExists
+      assertThatThrownBy(
+              () ->
+                  env.http.post(
+                      
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+                      req,
+                      LoadTableResponse.class,
+                      env.headers,
+                      ErrorHandlers.tableErrorHandler()))
+          .isInstanceOf(AlreadyExistsException.class)
+          .hasMessageContaining(env.ident.toString());
+    } finally {
+      // Restore default TTL for other tests
+      CatalogHandlers.setIdempotencyLifetimeFromIso("PT30M");
+    }
+  }
+
+  @Test
+  public void testIdempotentCreateReplayAfterSimulated503() {
+    // Use a fixed key and simulate 503 after first success for that key
+    String key = "idemp-create-503";
+    adapterForRESTServer.simulate503OnFirstSuccessForKey(key);
+    Namespace ns = Namespace.of("ns_idemp");
+    IdempotentEnv env = idempotentEnv(key, ns, "t_idemp");
+    CreateTableRequest req = createReq(env.ident);
+
+    // First attempt: server finalizes success but responds 503
+    assertThatThrownBy(
+            () ->
+                env.http.post(
+                    
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+                    req,
+                    LoadTableResponse.class,
+                    env.headers,
+                    ErrorHandlers.tableErrorHandler()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("simulated transient 503");
+
+    // Verify request shape (method, path, headers including Idempotency-Key)
+    verifyCreatePost(ns, env.headers);
+
+    // Retry with same key: server should replay 200 OK
+    LoadTableResponse replay =
+        env.http.post(
+            ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+            req,
+            LoadTableResponse.class,
+            env.headers,
+            ErrorHandlers.tableErrorHandler());
+    assertThat(replay).isNotNull();
+  }
+
+  @Test
+  public void testIdempotentDropDuplicateNoop() {
+    String key = "idemp-drop-void";
+    Namespace ns = Namespace.of("ns_void");
+    IdempotentEnv env = idempotentEnv(key, ns, "t_void");
+
+    // Create a table to drop
+    restCatalog.createTable(
+        env.ident,
+        new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get())),
+        PartitionSpec.unpartitioned());
+
+    String path = 
ResourcePaths.forCatalogProperties(ImmutableMap.of()).table(env.ident);
+
+    // First drop: table exists -> drop succeeds
+    env.http.delete(path, null, env.headers, 
ErrorHandlers.tableErrorHandler());
+    assertThat(restCatalog.tableExists(env.ident)).isFalse();
+
+    // Second drop with the same key: should be a no-op (no exception)
+    assertThatCode(
+            () -> env.http.delete(path, null, env.headers, 
ErrorHandlers.tableErrorHandler()))
+        .doesNotThrowAnyException();
+  }
+
   @Test
   public void nestedNamespaceWithLegacySeparator() {
     RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
@@ -3454,6 +3692,71 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
     return local;
   }
 
+  private Pair<RESTClient, Map<String, String>> httpAndHeaders(String 
idempotencyKey) {
+    Map<String, String> headers =
+        ImmutableMap.of(
+            RESTUtil.IDEMPOTENCY_KEY_HEADER,
+            idempotencyKey,
+            "Authorization",
+            "Bearer client-credentials-token:sub=user",
+            "test-header",
+            "test-value");
+
+    Map<String, String> conf =
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            httpServer.getURI().toString(),
+            HTTPClient.REST_SOCKET_TIMEOUT_MS,
+            "600000",
+            HTTPClient.REST_CONNECTION_TIMEOUT_MS,
+            "600000",
+            "header.test-header",
+            "test-value");
+    RESTClient httpBase =
+        HTTPClient.builder(conf)
+            .uri(conf.get(CatalogProperties.URI))
+            .withHeaders(RESTUtil.configHeaders(conf))
+            .build();
+    AuthManager am = AuthManagers.loadAuthManager("test", conf);
+    AuthSession httpSession = am.initSession(httpBase, conf);
+    RESTClient http = httpBase.withAuthSession(httpSession);
+    return Pair.of(http, headers);
+  }
+
+  private Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> 
prepareIdempotentEnv(
+      String key, Namespace ns, String tableName) {
+    TableIdentifier ident = TableIdentifier.of(ns, tableName);
+    restCatalog.createNamespace(ns, ImmutableMap.of());
+    return Pair.of(ident, httpAndHeaders(key));
+  }
+
+  private IdempotentEnv idempotentEnv(String key, Namespace ns, String 
tableName) {
+    Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+        prepareIdempotentEnv(key, ns, tableName);
+    Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+    return new IdempotentEnv(env.first(), httpAndHeaders.first(), 
httpAndHeaders.second());
+  }
+
+  private static CreateTableRequest createReq(TableIdentifier ident) {
+    return CreateTableRequest.builder()
+        .withName(ident.name())
+        .withSchema(new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get())))
+        .withPartitionSpec(PartitionSpec.unpartitioned())
+        .build();
+  }
+
+  private void verifyCreatePost(Namespace ns, Map<String, String> headers) {
+    verify(adapterForRESTServer, atLeastOnce())
+        .execute(
+            reqMatcherContainsHeaders(
+                HTTPMethod.POST,
+                
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+                headers),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+  }
+
   @Test
   @Override
   public void testLoadTableWithMissingMetadataFile(@TempDir Path tempDir) {
@@ -3545,6 +3848,15 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
                 && Objects.equals(req.body(), body));
   }
 
+  static HTTPRequest reqMatcherContainsHeaders(
+      HTTPMethod method, String path, Map<String, String> headers) {
+    return argThat(
+        req ->
+            req.method() == method
+                && req.path().equals(path)
+                && 
req.headers().entries().containsAll(HTTPHeaders.of(headers).entries()));
+  }
+
   private static List<HTTPRequest> allRequests(RESTCatalogAdapter adapter) {
     ArgumentCaptor<HTTPRequest> captor = 
ArgumentCaptor.forClass(HTTPRequest.class);
     verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), 
any());


Reply via email to