huaxingao commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2663686041


##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -108,8 +114,147 @@ public class CatalogHandlers {
       InMemoryPlanningState.getInstance();
   private static final ExecutorService ASYNC_PLANNING_POOL = 
Executors.newSingleThreadExecutor();
 
+  // Advanced idempotency store with TTL and in-flight coalescing.
+  //
+  // Note: This is a simple in-memory implementation meant for tests and 
lightweight usage.
+  // Production servers should provide a durable store.
+  private static final ConcurrentMap<String, IdempotencyEntry> 
IDEMPOTENCY_STORE =
+      Maps.newConcurrentMap();
+  private static volatile long idempotencyLifetimeMillis = 
TimeUnit.MINUTES.toMillis(30);
+
   private CatalogHandlers() {}
 
+  @SuppressWarnings("unchecked")
+  public static <T extends RESTResponse> T withIdempotency(
+      HTTPRequest httpRequest, Supplier<T> action) {
+    return withIdempotencyInternal(httpRequest, action);
+  }
+
+  public static void withIdempotency(HTTPRequest httpRequest, Runnable action) 
{
+    withIdempotencyInternal(
+        httpRequest,
+        () -> {
+          action.run();
+          return Boolean.TRUE;
+        });
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T withIdempotencyInternal(HTTPRequest httpRequest, 
Supplier<T> action) {
+    Optional<HTTPHeaders.HTTPHeader> keyHeader =
+        httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+    if (keyHeader.isEmpty()) {
+      return action.get();
+    }
+
+    String key = keyHeader.get().value();
+
+    AtomicBoolean isLeader = new AtomicBoolean(false);
+    IdempotencyEntry entry =
+        IDEMPOTENCY_STORE.compute(
+            key,
+            (k, current) -> {
+              if (current == null || current.isExpired()) {
+                isLeader.set(true);
+                return IdempotencyEntry.inProgress();
+              }
+              return current;
+            });
+
+    // Fast-path: already finalized (another request completed earlier)
+    if (entry.status == IdempotencyEntry.Status.FINALIZED) {
+      if (entry.error != null) {
+        throw entry.error;
+      }
+      return (T) entry.responseBody;
+    }
+
+    if (!isLeader.get()) {
+      // In-flight coalescing: wait for the leader request to finalize
+      entry.awaitFinalized();
+      if (entry.error != null) {
+        throw entry.error;
+      }
+      return (T) entry.responseBody;
+    }
+
+    // Leader request: execute the action and finalize the entry
+    try {
+      T res = action.get();
+      entry.finalizeSuccess(res);
+      return res;
+    } catch (RuntimeException e) {
+      entry.finalizeError(e);
+      throw e;
+    }
+  }
+
+  public static void setIdempotencyLifetimeFromIso(String isoDuration) {
+    if (isoDuration == null) {
+      return;
+    }
+    try {
+      idempotencyLifetimeMillis = Duration.parse(isoDuration).toMillis();
+    } catch (Exception e) {
+      // ignore parse errors; keep default

Review Comment:
   Good catch. I made this test-only (@VisibleForTesting + package-private) and 
changed it to throw IllegalArgumentException on invalid ISO durations rather 
than silently ignoring parse errors.



##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -114,6 +115,7 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
 
   private AuthSession authSession = AuthSession.EMPTY;
   private PlanningBehavior planningBehavior;
+  private final java.util.Set<String> simulate503OnFirstSuccessKeys = 
Sets.newConcurrentHashSet();

Review Comment:
   Fixed. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to