singhpk234 commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2663269683
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3312,6 +3319,164 @@ public void
testClientDoesNotSendIdempotencyWhenServerNotAdvertising() {
local.dropTable(ident);
}
+ @Test
+ public void testIdempotentDuplicateCreateReturnsCached() {
+ String key = "dup-create-key";
+ Namespace ns = Namespace.of("ns_dup");
+ Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+ prepareIdempotentEnv(key, ns, "t_dup");
+ TableIdentifier ident = env.first();
+ Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+ RESTClient http = httpAndHeaders.first();
+ Map<String, String> headers = httpAndHeaders.second();
+ CreateTableRequest req = createReq(ident);
+
+ // First create succeeds
+ LoadTableResponse first =
+ http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(first).isNotNull();
+
+ // Verify request shape (method, path, headers including Idempotency-Key)
+ verifyCreatePost(ns, headers);
+
+ // Duplicate with same key returns cached 200 OK
+ LoadTableResponse second =
+ http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(second).isNotNull();
+
+ // Clean up
+ restCatalog.dropTable(ident);
+ }
+
+ @Test
+ public void testIdempotencyKeyLifetimeExpiredTreatsAsNew() {
+ // Set TTL to 0 so cached success expires immediately
+ CatalogHandlers.setIdempotencyLifetimeFromIso("PT0S");
+ try {
+ String key = "expired-create-key";
+ Namespace ns = Namespace.of("ns_exp");
+ Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+ prepareIdempotentEnv(key, ns, "t_exp");
+ TableIdentifier ident = env.first();
+ Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+ RESTClient http = httpAndHeaders.first();
+ Map<String, String> headers = httpAndHeaders.second();
+ CreateTableRequest req = createReq(ident);
+
+ // First create succeeds
+ LoadTableResponse created =
+ http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(created).isNotNull();
+
+ // Verify request shape (method, path, headers including Idempotency-Key)
+ verifyCreatePost(ns, headers);
+
+ // TTL expired -> duplicate with same key should be treated as new and
fail with AlreadyExists
+ assertThatThrownBy(
+ () ->
+ http.post(
+
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler()))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining(ident.toString());
+
+ // Clean up
+ restCatalog.dropTable(ident);
Review Comment:
do we need explicit drop ?
##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -108,8 +114,147 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing.
+ //
+ // Note: This is a simple in-memory implementation meant for tests and
lightweight usage.
+ // Production servers should provide a durable store.
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
+
private CatalogHandlers() {}
+ @SuppressWarnings("unchecked")
+ public static <T extends RESTResponse> T withIdempotency(
+ HTTPRequest httpRequest, Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ public static void withIdempotency(HTTPRequest httpRequest, Runnable action)
{
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ AtomicBoolean isLeader = new AtomicBoolean(false);
+ IdempotencyEntry entry =
+ IDEMPOTENCY_STORE.compute(
+ key,
+ (k, current) -> {
+ if (current == null || current.isExpired()) {
+ isLeader.set(true);
+ return IdempotencyEntry.inProgress();
+ }
+ return current;
+ });
+
+ // Fast-path: already finalized (another request completed earlier)
+ if (entry.status == IdempotencyEntry.Status.FINALIZED) {
+ if (entry.error != null) {
+ throw entry.error;
+ }
+ return (T) entry.responseBody;
+ }
+
+ if (!isLeader.get()) {
+ // In-flight coalescing: wait for the leader request to finalize
+ entry.awaitFinalized();
+ if (entry.error != null) {
+ throw entry.error;
+ }
+ return (T) entry.responseBody;
+ }
+
+ // Leader request: execute the action and finalize the entry
+ try {
+ T res = action.get();
+ entry.finalizeSuccess(res);
+ return res;
+ } catch (RuntimeException e) {
+ entry.finalizeError(e);
+ throw e;
+ }
+ }
+
+ public static void setIdempotencyLifetimeFromIso(String isoDuration) {
+ if (isoDuration == null) {
+ return;
+ }
+ try {
+ idempotencyLifetimeMillis = Duration.parse(isoDuration).toMillis();
+ } catch (Exception e) {
+ // ignore parse errors; keep default
Review Comment:
sorry i might have missed this in first pass, this is public imho we should
throw illegalArgs ?
##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -108,8 +114,147 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing.
+ //
+ // Note: This is a simple in-memory implementation meant for tests and
lightweight usage.
+ // Production servers should provide a durable store.
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
+
private CatalogHandlers() {}
+ @SuppressWarnings("unchecked")
+ public static <T extends RESTResponse> T withIdempotency(
+ HTTPRequest httpRequest, Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ public static void withIdempotency(HTTPRequest httpRequest, Runnable action)
{
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ AtomicBoolean isLeader = new AtomicBoolean(false);
Review Comment:
whats a Leader ?
##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -114,6 +115,7 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
private AuthSession authSession = AuthSession.EMPTY;
private PlanningBehavior planningBehavior;
+ private final java.util.Set<String> simulate503OnFirstSuccessKeys =
Sets.newConcurrentHashSet();
Review Comment:
nit: can we import this instead of inline ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]