nastra commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2668698620
##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -105,9 +111,145 @@ public class CatalogHandlers {
private static final InMemoryPlanningState IN_MEMORY_PLANNING_STATE =
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static final Set<String> SIMULATE_503_ON_FIRST_SUCCESS_KEYS =
Sets.newConcurrentHashSet();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
private CatalogHandlers() {}
+ /**
+ * Execute a mutation with basic idempotency semantics based on the
Idempotency-Key header.
+ *
+ * <p>This simple reference implementation stores the response in-memory
keyed by the header
+ * value. If the same key is seen again, the stored response is returned and
the action is not
+ * re-executed. This is suitable for tests and examples; production servers
should provide a
+ * durable store and TTL management.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends RESTResponse> T withIdempotency(
+ HTTPRequest httpRequest, Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ public static void withIdempotency(HTTPRequest httpRequest, Runnable action)
{
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ // check existing entry and TTL
+ IdempotencyEntry existing = IDEMPOTENCY_STORE.get(key);
+ if (existing != null) {
+ long now = System.currentTimeMillis();
+ boolean expired =
+ existing.status == IdempotencyEntry.Status.FINALIZED
+ && (now - existing.firstSeenMillis) > idempotencyLifetimeMillis;
+ if (!expired) {
+ existing.awaitFinalized();
+ if (existing.error != null) {
+ throw existing.error;
+ }
+ return (T) existing.responseBody;
+ } else {
+ IDEMPOTENCY_STORE.remove(key, existing);
+ }
+ }
+
+ IdempotencyEntry entry = IdempotencyEntry.inProgress();
+ IDEMPOTENCY_STORE.put(key, entry);
+ try {
+ T res = action.get();
+ entry.finalizeSuccess(res);
+
+ if (SIMULATE_503_ON_FIRST_SUCCESS_KEYS.remove(key)) {
+ throw new CommitStateUnknownException(
+ new RuntimeException("simulated transient 503 after success"));
+ }
+ return res;
+ } catch (RuntimeException e) {
+ if (entry.status != IdempotencyEntry.Status.FINALIZED ||
entry.responseBody == null) {
+ entry.finalizeError(e);
+ }
+ throw e;
+ }
+ }
+
+ // Test hooks/configuration for idempotency behavior
+ public static void simulate503OnFirstSuccessForKey(String key) {
Review Comment:
I don't think we should add this functionality at all to the adapter.
Instead, what about just having a custom adapter that simulates this behavior
for that particular test case. We already use custom adapters with custom
behavior in a lot of other test methods.
My main concern is that we're trying to force "test conditions" into code
that shouldn't have such test conditions, because `RESTCatalogAdapter` could be
used in real usage scenarios and it shouldn't ideally contain test code.
Additionally, this opens up a precedent for future code modifications where it
might seem ok to add more and more testing conditions to the adapter or to
CatalogHandlers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]