huaxingao commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2677315825


##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -124,12 +133,107 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
               RESTCatalogProperties.NAMESPACE_SEPARATOR,
               RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8));
 
+  /**
+   * Test-only adapter that keeps request/response round-trip serialization 
and header validation
+   * from the base test setup, while also allowing specific tests to inject 
transient failures.
+   */
+  private static class HeaderValidatingAdapter extends RESTCatalogAdapter {
+    private final HTTPHeaders catalogHeaders;
+    private final HTTPHeaders contextHeaders;
+    private final Set<String> simulate503OnFirstSuccessKeys =
+        
org.apache.iceberg.relocated.com.google.common.collect.Sets.newConcurrentHashSet();

Review Comment:
   Good idea. I generalized the test-only hook to a key -> RuntimeException map 
so tests can simulate different post-success transient failures while still 
keeping simulate503OnFirstSuccessForKey as a convenience wrapper for the 
current case.



##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -268,19 +277,29 @@ public <T extends RESTResponse> T handleRequest(
             return castResponse(
                 responseType, CatalogHandlers.stageTableCreate(catalog, 
namespace, request));
           } else {
-            LoadTableResponse response = CatalogHandlers.createTable(catalog, 
namespace, request);
-            responseHeaders.accept(
-                ImmutableMap.of(HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
-            return castResponse(responseType, response);
+            return CatalogHandlers.withIdempotency(
+                httpRequest,
+                () -> {
+                  LoadTableResponse response =
+                      CatalogHandlers.createTable(catalog, namespace, request);
+                  responseHeaders.accept(
+                      ImmutableMap.of(
+                          HttpHeaders.ETAG, 
ETagProvider.of(response.metadataLocation())));
+                  return castResponse(responseType, response);
+                });
           }
         }
 
       case DROP_TABLE:
         {
           if (PropertyUtil.propertyAsBoolean(vars, "purgeRequested", false)) {
-            CatalogHandlers.purgeTable(catalog, tableIdentFromPathVars(vars));
+            CatalogHandlers.withIdempotency(

Review Comment:
   Good point. The idempotency behavior is driven by the HTTP request headers, 
so I’m keeping the wrapping at the routing/adapter layer rather than pushing 
HTTPRequest through every CatalogHandlers.* method.
   That said, I agree it doesn’t need to be exposed publicly — I made 
withIdempotency(...) package-private and it’s only used internally by the REST 
adapter/routes.



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3313,6 +3393,155 @@ public void 
testClientDoesNotSendIdempotencyWhenServerNotAdvertising() {
     local.dropTable(ident);
   }
 
+  @Test
+  public void testIdempotentDuplicateCreateReturnsCached() {
+    String key = "dup-create-key";
+    Namespace ns = Namespace.of("ns_dup");
+    Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+        prepareIdempotentEnv(key, ns, "t_dup");
+    TableIdentifier ident = env.first();
+    Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+    RESTClient http = httpAndHeaders.first();
+    Map<String, String> headers = httpAndHeaders.second();
+    CreateTableRequest req = createReq(ident);

Review Comment:
    I factored the repeated prepareIdempotentEnv unpacking into a small 
IdempotentEnv helper



##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -108,8 +115,152 @@ public class CatalogHandlers {
       InMemoryPlanningState.getInstance();
   private static final ExecutorService ASYNC_PLANNING_POOL = 
Executors.newSingleThreadExecutor();
 
+  // Advanced idempotency store with TTL and in-flight coalescing.
+  //
+  // Note: This is a simple in-memory implementation meant for tests and 
lightweight usage.
+  // Production servers should provide a durable store.
+  private static final ConcurrentMap<String, IdempotencyEntry> 
IDEMPOTENCY_STORE =
+      Maps.newConcurrentMap();
+  private static volatile long idempotencyLifetimeMillis = 
TimeUnit.MINUTES.toMillis(30);
+
   private CatalogHandlers() {}
 
+  @SuppressWarnings("unchecked")
+  public static <T extends RESTResponse> T withIdempotency(
+      HTTPRequest httpRequest, Supplier<T> action) {
+    return withIdempotencyInternal(httpRequest, action);
+  }
+
+  public static void withIdempotency(HTTPRequest httpRequest, Runnable action) 
{
+    withIdempotencyInternal(
+        httpRequest,
+        () -> {
+          action.run();
+          return Boolean.TRUE;
+        });
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T withIdempotencyInternal(HTTPRequest httpRequest, 
Supplier<T> action) {
+    Optional<HTTPHeaders.HTTPHeader> keyHeader =
+        httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+    if (keyHeader.isEmpty()) {
+      return action.get();
+    }
+
+    String key = keyHeader.get().value();
+
+    // "Leader" is the request thread that wins the 
IDEMPOTENCY_STORE.compute(...) and creates (or
+    // replaces) the IN_PROGRESS entry for this Idempotency-Key. Only the 
leader executes the
+    // action and finalizes the entry; concurrent requests for the same key 
("followers") wait on
+    // the latch and then replay the finalized result/error.
+    AtomicBoolean isLeader = new AtomicBoolean(false);

Review Comment:
   Changed to `isFirst`. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to