This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new a90ab6a8bb Core: Do not cleanup when CREATE transactions fail with 503
(#15051) (#15662)
a90ab6a8bb is described below
commit a90ab6a8bb0aa8f4f98b376bfd55451601a72f8b
Author: Yuya Ebihara <[email protected]>
AuthorDate: Wed Mar 18 23:50:11 2026 +0900
Core: Do not cleanup when CREATE transactions fail with 503 (#15051)
(#15662)
(cherry picked from commit 63d4084722824054b3460b8600f5cee9ab9a7248)
Co-authored-by: Alessandro Nori <[email protected]>
---
.../org/apache/iceberg/rest/ErrorHandlers.java | 21 +++++++
.../apache/iceberg/rest/RESTTableOperations.java | 2 +-
.../org/apache/iceberg/rest/TestRESTCatalog.java | 72 ++++++++++++++++++++++
3 files changed, 94 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
index 0c21fed4de..640b1ce37f 100644
--- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
@@ -73,6 +73,10 @@ public class ErrorHandlers {
return CommitErrorHandler.INSTANCE;
}
+ public static Consumer<ErrorResponse> createTableErrorHandler() {
+ return CreateTableErrorHandler.INSTANCE;
+ }
+
public static Consumer<ErrorResponse> defaultErrorHandler() {
return DefaultErrorHandler.INSTANCE;
}
@@ -125,6 +129,23 @@ public class ErrorHandlers {
}
}
+ /** Table create error handler. */
+ private static class CreateTableErrorHandler extends CommitErrorHandler {
+ private static final ErrorHandler INSTANCE = new CreateTableErrorHandler();
+
+ @Override
+ public void accept(ErrorResponse error) {
+ switch (error.code()) {
+ case 404:
+ throw new NoSuchNamespaceException("%s", error.message());
+ case 409:
+ throw new AlreadyExistsException("%s", error.message());
+ }
+
+ super.accept(error);
+ }
+ }
+
/** View commit error handler. */
private static class ViewCommitErrorHandler extends DefaultErrorHandler {
private static final ErrorHandler INSTANCE = new ViewCommitErrorHandler();
diff --git
a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
index 5f6c28b323..f46c442e8d 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
@@ -123,7 +123,7 @@ class RESTTableOperations implements TableOperations {
.addAll(metadata.changes())
.build();
requirements = UpdateRequirements.forCreateTable(updates);
- errorHandler = ErrorHandlers.tableErrorHandler(); // throws
NoSuchTableException
+ errorHandler = ErrorHandlers.createTableErrorHandler();
break;
case REPLACE:
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 47c27c9757..bdea807054 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -64,6 +64,7 @@ import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NotAuthorizedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ServiceFailureException;
@@ -2596,6 +2597,77 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
});
}
+ @Test
+ public void testNoCleanupOnCreate503() {
+ RESTCatalogAdapter adapter =
+ Mockito.spy(
+ new RESTCatalogAdapter(backendCatalog) {
+ @Override
+ protected <T extends RESTResponse> T execute(
+ HTTPRequest request,
+ Class<T> responseType,
+ Consumer<ErrorResponse> errorHandler,
+ Consumer<Map<String, String>> responseHeaders) {
+ var response = super.execute(request, responseType,
errorHandler, responseHeaders);
+ if (request.method() == HTTPMethod.POST &&
request.path().contains(TABLE.name())) {
+ // Simulate a 503 Service Unavailable error
+ ErrorResponse error =
+ ErrorResponse.builder()
+ .responseCode(503)
+ .withMessage("Service unavailable")
+ .build();
+
+ errorHandler.accept(error);
+ throw new IllegalStateException("Error handler should have
thrown");
+ }
+ return response;
+ }
+ });
+
+ RESTCatalog catalog = catalog(adapter);
+
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(TABLE.namespace());
+ }
+
+ Transaction createTableTransaction =
catalog.newCreateTableTransaction(TABLE, SCHEMA);
+ createTableTransaction.newAppend().appendFile(FILE_A).commit();
+
+ // Verify that 503 is mapped to CommitStateUnknownException (not just
ServiceFailureException)
+ assertThatThrownBy(createTableTransaction::commitTransaction)
+ .isInstanceOf(CommitStateUnknownException.class)
+ .cause()
+ .isInstanceOf(ServiceFailureException.class)
+ .hasMessageContaining("Service failed: 503");
+
+ // Verify files are NOT cleaned up (because commit state is unknown)
+ assertThat(allRequests(adapter))
+ .anySatisfy(
+ req -> {
+ assertThat(req.method()).isEqualTo(HTTPMethod.POST);
+ assertThat(req.path()).isEqualTo(RESOURCE_PATHS.table(TABLE));
+ assertThat(req.body()).isInstanceOf(UpdateTableRequest.class);
+ UpdateTableRequest body = (UpdateTableRequest) req.body();
+ assertThat(
+ body.updates().stream()
+ .filter(MetadataUpdate.AddSnapshot.class::isInstance)
+ .map(MetadataUpdate.AddSnapshot.class::cast)
+ .findFirst())
+ .hasValueSatisfying(
+ addSnapshot -> {
+ String manifestListLocation =
addSnapshot.snapshot().manifestListLocation();
+ // Files should still exist because we don't know if
commit succeeded
+ assertThat(
+ catalog
+ .loadTable(TABLE)
+ .io()
+ .newInputFile(manifestListLocation)
+ .exists())
+ .isTrue();
+ });
+ });
+ }
+
@Test
public void testCleanupCleanableExceptionsReplace() {
RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));