singhpk234 commented on code in PR #1285:
URL: https://github.com/apache/polaris/pull/1285#discussion_r2061232432


##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -762,7 +777,190 @@ public LoadTableResponse updateTable(
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot update table on static-facade 
external catalogs.");
     }
-    return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, 
applyUpdateFilters(request));
+    // TODO: pending discussion if table property is right way, or a writer 
specific knob is
+    // required.
+    return updateTableWithRollback(baseCatalog, tableIdentifier, 
applyUpdateFilters(request));
+  }
+
+  // TODO: Clean this up when CatalogHandler become extensible.
+  // Copy of CatalogHandler#update
+  private static LoadTableResponse updateTableWithRollback(
+      Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+    Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+    TableMetadata finalMetadata;
+    if (isCreate(request)) {
+      Transaction transaction =
+          catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+      if (!(transaction instanceof BaseTransaction)) {
+        throw new IllegalStateException(
+            "Cannot wrap catalog that does not produce BaseTransaction");
+      }
+
+      BaseTransaction baseTransaction = (BaseTransaction) transaction;
+      finalMetadata = create(baseTransaction.underlyingOps(), request);
+    } else {
+      Table table = catalog.loadTable(ident);
+      if (!(table instanceof BaseTable)) {
+        throw new IllegalStateException("Cannot wrap catalog that does not 
produce BaseTable");
+      }
+
+      TableOperations ops = ((BaseTable) table).operations();
+      finalMetadata = commit(ops, request);
+    }
+
+    return 
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+  }
+
+  // TODO: Clean this up when CatalogHandler become extensible.
+  // Copy of CatalogHandler#create
+  private static TableMetadata create(TableOperations ops, UpdateTableRequest 
request) {
+    request.requirements().forEach((requirement) -> 
requirement.validate(ops.current()));
+    Optional<Integer> formatVersion =
+        request.updates().stream()
+            .filter((update) -> update instanceof 
MetadataUpdate.UpgradeFormatVersion)
+            .map((update) -> ((MetadataUpdate.UpgradeFormatVersion) 
update).formatVersion())
+            .findFirst();
+    TableMetadata.Builder builder =
+        (TableMetadata.Builder)
+            formatVersion
+                .map(TableMetadata::buildFromEmpty)
+                .orElseGet(TableMetadata::buildFromEmpty);
+    request.updates().forEach((update) -> update.applyTo(builder));
+    ops.commit((TableMetadata) null, builder.build());
+    return ops.current();
+  }
+
+  @VisibleForTesting
+  // TODO: Clean this up when CatalogHandler become extensible.
+  // Copy of CatalogHandler#commit
+  public static TableMetadata commit(TableOperations ops, UpdateTableRequest 
request) {
+    AtomicBoolean isRetry = new AtomicBoolean(false);
+
+    try {
+      Tasks.foreach(new TableOperations[] {ops})
+          .retry(4)
+          .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
+          .onlyRetryOn(CommitFailedException.class)
+          .run(
+              (taskOps) -> {
+                TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
+                isRetry.set(true);
+                // Prev PR: https://github.com/apache/iceberg/pull/5888
+                boolean rollbackCompaction =
+                    PropertyUtil.propertyAsBoolean(
+                        taskOps.current().properties(), 
ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
+
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
+                try {
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
+                } catch (CommitFailedException e) {
+                  if (!rollbackCompaction) {
+                    throw new ValidationFailureException(e);
+                  }
+                  // Since snapshot has already been created at the client end.
+                  // Nothing much can be done, we can move this
+                  // to writer specific thing, but it would be cool if catalog 
does this for us.
+                  // Inspect that the requirements states that snapshot
+                  // ref needs to be asserted this usually means in the update 
section
+                  // it has addSnapshot and setSnapshotRef
+                  UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
+                  int found = 0;
+                  for (UpdateRequirement requirement : request.requirements()) 
{
+                    // there should be only add snapshot request

Review Comment:
   Ack, added comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to