amogh-jahagirdar commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2676150344
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -124,12 +133,107 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
RESTCatalogProperties.NAMESPACE_SEPARATOR,
RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8));
+ /**
+ * Test-only adapter that keeps request/response round-trip serialization
and header validation
+ * from the base test setup, while also allowing specific tests to inject
transient failures.
+ */
+ private static class HeaderValidatingAdapter extends RESTCatalogAdapter {
+ private final HTTPHeaders catalogHeaders;
+ private final HTTPHeaders contextHeaders;
+ private final Set<String> simulate503OnFirstSuccessKeys =
+
org.apache.iceberg.relocated.com.google.common.collect.Sets.newConcurrentHashSet();
Review Comment:
Minor: Maybe this should be a mapping from the key to a generic throwable
that's thrown, then we could add other failure cases and make sure idempotency
returns or throws as expected.
##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -108,8 +115,152 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing.
+ //
+ // Note: This is a simple in-memory implementation meant for tests and
lightweight usage.
+ // Production servers should provide a durable store.
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
+
private CatalogHandlers() {}
+ @SuppressWarnings("unchecked")
+ public static <T extends RESTResponse> T withIdempotency(
+ HTTPRequest httpRequest, Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ public static void withIdempotency(HTTPRequest httpRequest, Runnable action)
{
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ // "Leader" is the request thread that wins the
IDEMPOTENCY_STORE.compute(...) and creates (or
+ // replaces) the IN_PROGRESS entry for this Idempotency-Key. Only the
leader executes the
+ // action and finalizes the entry; concurrent requests for the same key
("followers") wait on
+ // the latch and then replay the finalized result/error.
+ AtomicBoolean isLeader = new AtomicBoolean(false);
Review Comment:
Minor: I'd probably just call this isFirst. Leader/follower makes it seem
more than it really is.
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3313,6 +3393,155 @@ public void
testClientDoesNotSendIdempotencyWhenServerNotAdvertising() {
local.dropTable(ident);
}
+ @Test
+ public void testIdempotentDuplicateCreateReturnsCached() {
+ String key = "dup-create-key";
+ Namespace ns = Namespace.of("ns_dup");
+ Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+ prepareIdempotentEnv(key, ns, "t_dup");
+ TableIdentifier ident = env.first();
+ Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+ RESTClient http = httpAndHeaders.first();
+ Map<String, String> headers = httpAndHeaders.second();
+ CreateTableRequest req = createReq(ident);
Review Comment:
Nit: I think this bit is repeated across tests, any way to abstract some of
this behind a helper?
##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -268,19 +277,29 @@ public <T extends RESTResponse> T handleRequest(
return castResponse(
responseType, CatalogHandlers.stageTableCreate(catalog,
namespace, request));
} else {
- LoadTableResponse response = CatalogHandlers.createTable(catalog,
namespace, request);
- responseHeaders.accept(
- ImmutableMap.of(HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
- return castResponse(responseType, response);
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> {
+ LoadTableResponse response =
+ CatalogHandlers.createTable(catalog, namespace, request);
+ responseHeaders.accept(
+ ImmutableMap.of(
+ HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
+ return castResponse(responseType, response);
+ });
}
}
case DROP_TABLE:
{
if (PropertyUtil.propertyAsBoolean(vars, "purgeRequested", false)) {
- CatalogHandlers.purgeTable(catalog, tableIdentFromPathVars(vars));
+ CatalogHandlers.withIdempotency(
Review Comment:
Minor: I wonder if withIdempotency even needs to be exposed? Feels like it
could just be invoked internally in each of the existing CatalogHandlers
methods?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]