This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 3452153ed2 Core: Replace Failsafe with Tasks utility in RESTTableScan
(#15613)
3452153ed2 is described below
commit 3452153ed2e8e9ee897c1a9a76a3264df9b2855b
Author: Prashant Singh <[email protected]>
AuthorDate: Fri Mar 13 12:24:23 2026 -0700
Core: Replace Failsafe with Tasks utility in RESTTableScan (#15613)
Replace the Failsafe library dependency in
RESTTableScan.fetchPlanningResult()
with Iceberg's built-in Tasks utility for retry/backoff, aligning with
codebase
conventions and removing the failsafe dependency from iceberg-core.
Co-authored-by: Prashant Singh <[email protected]>
---
build.gradle | 1 -
.../org/apache/iceberg/rest/RESTTableScan.java | 106 ++++++++-------------
2 files changed, 40 insertions(+), 67 deletions(-)
diff --git a/build.gradle b/build.gradle
index 765a7fe820..52d25bc33b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -379,7 +379,6 @@ project(':iceberg-core') {
implementation libs.jackson.databind
implementation libs.caffeine
implementation libs.roaringbitmap
- implementation libs.failsafe
compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index f8860c42d7..f533f2c87f 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -21,13 +21,10 @@ package org.apache.iceberg.rest;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
-import dev.failsafe.Failsafe;
-import dev.failsafe.FailsafeException;
-import dev.failsafe.RetryPolicy;
-import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
@@ -50,6 +47,7 @@ import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -228,71 +226,45 @@ class RESTTableScan extends DataTableScan {
}
private CloseableIterable<FileScanTask> fetchPlanningResult() {
- RetryPolicy<FetchPlanningResultResponse> retryPolicy =
- RetryPolicy.<FetchPlanningResultResponse>builder()
- .handleResultIf(response -> response.planStatus() ==
PlanStatus.SUBMITTED)
- .withBackoff(
- Duration.ofMillis(MIN_SLEEP_MS),
Duration.ofMillis(MAX_SLEEP_MS), SCALE_FACTOR)
- .withJitter(0.1) // Add jitter up to 10% of the calculated delay
- .withMaxAttempts(MAX_ATTEMPTS)
- .withMaxDuration(Duration.ofMillis(MAX_WAIT_TIME_MS))
- .onFailedAttempt(
- e -> {
- // Log when a retry occurs
- LOG.debug(
- "Plan {} still SUBMITTED (Attempt {}/{}). Previous
attempt took {} ms.",
- planId,
- e.getAttemptCount(),
- MAX_ATTEMPTS,
- e.getElapsedAttemptTime().toMillis());
- })
- .onFailure(
- e -> {
- LOG.warn(
- "Polling for plan {} failed due to: {}",
- planId,
- e.getException().getMessage());
- cleanupPlanResources();
- })
- .build();
+ AtomicReference<FetchPlanningResultResponse> result = new
AtomicReference<>();
+ Tasks.foreach(planId)
+ .exponentialBackoff(MIN_SLEEP_MS, MAX_SLEEP_MS, MAX_WAIT_TIME_MS,
SCALE_FACTOR)
+ .retry(MAX_ATTEMPTS)
+ .onlyRetryOn(NotCompleteException.class)
+ .onFailure(
+ (id, err) -> {
+ LOG.warn("Planning failed for plan ID: {}", id, err);
+ cleanupPlanResources();
+ })
+ .throwFailureWhenFinished()
+ .run(
+ id -> {
+ FetchPlanningResultResponse response =
+ client.get(
+ resourcePaths.plan(tableIdentifier, id),
+ headers,
+ FetchPlanningResultResponse.class,
+ headers,
+ ErrorHandlers.planErrorHandler(),
+ parserContext);
- try {
- FetchPlanningResultResponse response =
- Failsafe.with(retryPolicy)
- .get(
- () ->
- client.get(
- resourcePaths.plan(tableIdentifier, planId),
- headers,
- FetchPlanningResultResponse.class,
- headers,
- ErrorHandlers.planErrorHandler(),
- parserContext));
- Preconditions.checkState(
- response.planStatus() == PlanStatus.COMPLETED,
- "Plan finished with unexpected status %s for planId: %s",
- response.planStatus(),
- planId);
+ if (response.planStatus() == PlanStatus.SUBMITTED) {
+ throw new NotCompleteException();
+ } else if (response.planStatus() != PlanStatus.COMPLETED) {
+ throw new IllegalStateException(
+ String.format(
+ "Invalid planStatus: %s for planId: %s",
response.planStatus(), id));
+ }
- this.scanFileIO =
- !response.credentials().isEmpty() ?
scanFileIO(response.credentials()) : table().io();
+ result.set(response);
+ });
- return scanTasksIterable(response.planTasks(), response.fileScanTasks());
- } catch (FailsafeException e) {
- // FailsafeException is thrown when retries are exhausted (Max
Attempts/Duration)
- // Cleanup is handled by the .onFailure() hook, so we just wrap and
rethrow.
- throw new IllegalStateException(
- String.format("Polling timed out or exceeded max attempts for
planId: %s.", planId), e);
- } catch (Exception e) {
- // Catch any immediate non-retryable exceptions (e.g., I/O errors, auth
errors)
- try {
- cleanupPlanResources();
- } catch (Exception cancelException) {
- // Ignore cancellation failures during exception handling
- e.addSuppressed(cancelException);
- }
- throw e;
- }
+ FetchPlanningResultResponse response = result.get();
+
+ this.scanFileIO =
+ !response.credentials().isEmpty() ? scanFileIO(response.credentials())
: table().io();
+
+ return scanTasksIterable(response.planTasks(), response.fileScanTasks());
}
private CloseableIterable<FileScanTask> scanTasksIterable(
@@ -342,4 +314,6 @@ class RESTTableScan extends DataTableScan {
return false;
}
}
+
+ private static class NotCompleteException extends RuntimeException {}
}