This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d72b03cb Refactor BasePolarisTableOperations & 
BasePolarisViewOperations (#1426)
0d72b03cb is described below

commit 0d72b03cb182f33baebb2164346a9118d5451a29
Author: Eric Maynard <[email protected]>
AuthorDate: Thu Apr 24 21:34:28 2025 -0700

    Refactor BasePolarisTableOperations & BasePolarisViewOperations (#1426)
    
    * initial copy paste
    
    * Reorder
    
    * view copy paste
    
    * fixes, polish
    
    * stable
    
    * yank
    
    * CODE_COPIED_TO_POLARIS comments
    
    * autolint
    
    * update license
    
    * typofix
    
    * update comments
    
    * autolint
---
 LICENSE                                            |   1 +
 .../service/catalog/iceberg/IcebergCatalog.java    | 437 +++++++++++++++++++--
 2 files changed, 397 insertions(+), 41 deletions(-)

diff --git a/LICENSE b/LICENSE
index e9c023b30..7b965b4a5 100644
--- a/LICENSE
+++ b/LICENSE
@@ -218,6 +218,7 @@ This product includes code from Apache Iceberg.
 * spec/iceberg-rest-catalog-open-api.yaml
 * spec/polaris-catalog-apis/oauth-tokens-api.yaml
 * 
integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
+* 
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
 Copyright: Copyright 2017-2025 The Apache Software Foundation
 Home page: https://iceberg.apache.org
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index f80d4077a..ae34be928 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -35,25 +35,32 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.LocationProviders;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableMetadataParser;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.BadRequestException;
 import org.apache.iceberg.exceptions.CommitFailedException;
@@ -68,13 +75,17 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.LocationUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.view.BaseMetastoreViewCatalog;
-import org.apache.iceberg.view.BaseViewOperations;
 import org.apache.iceberg.view.ViewBuilder;
 import org.apache.iceberg.view.ViewMetadata;
 import org.apache.iceberg.view.ViewMetadataParser;
 import org.apache.iceberg.view.ViewOperations;
+import org.apache.iceberg.view.ViewProperties;
 import org.apache.iceberg.view.ViewUtil;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
@@ -1190,7 +1201,14 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
     }
   }
 
-  private class BasePolarisTableOperations extends 
BaseMetastoreTableOperations {
+  /**
+   * An implementation of {@link TableOperations} that integrates with {@link 
IcebergCatalog}. Much
+   * of this code was originally copied from {@link
+   * org.apache.iceberg.BaseMetastoreTableOperations}. CODE_COPIED_TO_POLARIS 
From Apache Iceberg
+   * Version: 1.8
+   */
+  private class BasePolarisTableOperations extends 
PolarisOperationsBase<TableMetadata>
+      implements TableOperations {
     private final TableIdentifier tableIdentifier;
     private final String fullTableName;
     private FileIO tableFileIO;
@@ -1203,6 +1221,77 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
     }
 
     @Override
+    public TableMetadata current() {
+      if (shouldRefresh) {
+        return refresh();
+      }
+      return currentMetadata;
+    }
+
+    @Override
+    public TableMetadata refresh() {
+      boolean currentMetadataWasAvailable = currentMetadata != null;
+      try {
+        doRefresh();
+      } catch (NoSuchTableException e) {
+        if (currentMetadataWasAvailable) {
+          LOGGER.warn(
+              "Could not find the table during refresh, setting current 
metadata to null", e);
+          shouldRefresh = true;
+        }
+
+        currentMetadata = null;
+        currentMetadataLocation = null;
+        version = -1;
+        throw e;
+      }
+      return current();
+    }
+
+    @Override
+    public void commit(TableMetadata base, TableMetadata metadata) {
+      // if the metadata is already out of date, reject it
+      if (base != current()) {
+        if (base != null) {
+          throw new CommitFailedException("Cannot commit: stale table 
metadata");
+        } else {
+          // when current is non-null, the table exists. but when base is 
null, the commit is trying
+          // to create the table
+          throw new AlreadyExistsException("Table already exists: %s", 
fullTableName);
+        }
+      }
+      // if the metadata is not changed, return early
+      if (base == metadata) {
+        LOGGER.info("Nothing to commit.");
+        return;
+      }
+
+      long start = System.currentTimeMillis();
+      doCommit(base, metadata);
+      CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);
+      requestRefresh();
+
+      LOGGER.info(
+          "Successfully committed to table {} in {} ms",
+          fullTableName,
+          System.currentTimeMillis() - start);
+    }
+
+    @Override
+    public FileIO io() {
+      return tableFileIO;
+    }
+
+    @Override
+    public String metadataFileLocation(String filename) {
+      return metadataFileLocation(current(), filename);
+    }
+
+    @Override
+    public LocationProvider locationProvider() {
+      return LocationProviders.locationsFor(current().location(), 
current().properties());
+    }
+
     public void doRefresh() {
       LOGGER.debug("doRefresh for tableIdentifier {}", tableIdentifier);
       // While doing refresh/commit protocols, we must fetch the fresh 
"passthrough" resolved
@@ -1250,7 +1339,6 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       }
     }
 
-    @Override
     public void doCommit(TableMetadata base, TableMetadata metadata) {
       LOGGER.debug(
           "doCommit for table {} with base {}, metadata {}", tableIdentifier, 
base, metadata);
@@ -1376,11 +1464,11 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       }
       if (!Objects.equal(existingLocation, oldLocation)) {
         if (null == base) {
-          throw new AlreadyExistsException("Table already exists: %s", 
tableName());
+          throw new AlreadyExistsException("Table already exists: %s", 
fullTableName);
         }
 
         if (null == existingLocation) {
-          throw new NoSuchTableException("Table does not exist: %s", 
tableName());
+          throw new NoSuchTableException("Table does not exist: %s", 
fullTableName);
         }
 
         throw new CommitFailedException(
@@ -1396,45 +1484,101 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
     }
 
     @Override
-    public FileIO io() {
-      return tableFileIO;
+    public TableOperations temp(TableMetadata uncommittedMetadata) {
+      return new TableOperations() {
+        @Override
+        public TableMetadata current() {
+          return uncommittedMetadata;
+        }
+
+        @Override
+        public TableMetadata refresh() {
+          throw new UnsupportedOperationException(
+              "Cannot call refresh on temporary table operations");
+        }
+
+        @Override
+        public void commit(TableMetadata base, TableMetadata metadata) {
+          throw new UnsupportedOperationException(
+              "Cannot call commit on temporary table operations");
+        }
+
+        @Override
+        public String metadataFileLocation(String fileName) {
+          return BasePolarisTableOperations.this.metadataFileLocation(
+              uncommittedMetadata, fileName);
+        }
+
+        @Override
+        public LocationProvider locationProvider() {
+          return LocationProviders.locationsFor(
+              uncommittedMetadata.location(), 
uncommittedMetadata.properties());
+        }
+
+        @Override
+        public FileIO io() {
+          return BasePolarisTableOperations.this.io();
+        }
+
+        @Override
+        public EncryptionManager encryption() {
+          return BasePolarisTableOperations.this.encryption();
+        }
+
+        @Override
+        public long newSnapshotId() {
+          return BasePolarisTableOperations.this.newSnapshotId();
+        }
+      };
     }
 
-    @Override
-    protected String tableName() {
-      return fullTableName;
+    protected String writeNewMetadataIfRequired(boolean newTable, 
TableMetadata metadata) {
+      return newTable && metadata.metadataFileLocation() != null
+          ? metadata.metadataFileLocation()
+          : writeNewMetadata(metadata, version + 1);
     }
-  }
 
-  private void validateMetadataFileInTableDir(
-      TableIdentifier identifier, TableMetadata metadata, CatalogEntity 
catalog) {
-    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
-    boolean allowEscape =
-        polarisCallContext
-            .getConfigurationStore()
-            .getConfiguration(
-                polarisCallContext, 
FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION);
-    if (!allowEscape
-        && !polarisCallContext
-            .getConfigurationStore()
-            .getConfiguration(
-                polarisCallContext, 
FeatureConfiguration.ALLOW_EXTERNAL_METADATA_FILE_LOCATION)) {
-      LOGGER.debug(
-          "Validating base location {} for table {} in metadata file {}",
-          metadata.location(),
-          identifier,
-          metadata.metadataFileLocation());
-      StorageLocation metadataFileLocation = 
StorageLocation.of(metadata.metadataFileLocation());
-      StorageLocation baseLocation = StorageLocation.of(metadata.location());
-      if (!metadataFileLocation.isChildOf(baseLocation)) {
-        throw new BadRequestException(
-            "Metadata location %s is not allowed outside of table location %s",
-            metadata.metadataFileLocation(), metadata.location());
+    protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
+      String newTableMetadataFilePath = newTableMetadataFilePath(metadata, 
newVersion);
+      OutputFile newMetadataLocation = 
io().newOutputFile(newTableMetadataFilePath);
+
+      // write the new metadata
+      // use overwrite to avoid negative caching in S3. this is safe because 
the metadata location
+      // is
+      // always unique because it includes a UUID.
+      TableMetadataParser.overwrite(metadata, newMetadataLocation);
+
+      return newMetadataLocation.location();
+    }
+
+    private String metadataFileLocation(TableMetadata metadata, String 
filename) {
+      String metadataLocation = 
metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);
+
+      if (metadataLocation != null) {
+        return String.format("%s/%s", 
LocationUtil.stripTrailingSlash(metadataLocation), filename);
+      } else {
+        return String.format("%s/%s/%s", metadata.location(), 
METADATA_FOLDER_NAME, filename);
       }
     }
+
+    private String newTableMetadataFilePath(TableMetadata meta, int 
newVersion) {
+      String codecName =
+          meta.property(
+              TableProperties.METADATA_COMPRESSION, 
TableProperties.METADATA_COMPRESSION_DEFAULT);
+      String fileExtension = TableMetadataParser.getFileExtension(codecName);
+      return metadataFileLocation(
+          meta,
+          String.format(Locale.ROOT, "%05d-%s%s", newVersion, 
UUID.randomUUID(), fileExtension));
+    }
   }
 
-  private class BasePolarisViewOperations extends BaseViewOperations {
+  /**
+   * An implementation of {@link ViewOperations} that integrates with {@link 
IcebergCatalog}. Much
+   * of this code was originally copied from {@link 
org.apache.iceberg.view.BaseViewOperations}.
+   * CODE_COPIED_TO_POLARIS From Apache Iceberg Version: 1.8
+   */
+  private class BasePolarisViewOperations extends 
PolarisOperationsBase<ViewMetadata>
+      implements ViewOperations {
     private final TableIdentifier identifier;
     private final String fullViewName;
     private FileIO viewFileIO;
@@ -1446,6 +1590,65 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
     }
 
     @Override
+    public ViewMetadata current() {
+      if (shouldRefresh) {
+        return refresh();
+      }
+
+      return currentMetadata;
+    }
+
+    @Override
+    public ViewMetadata refresh() {
+      boolean currentMetadataWasAvailable = currentMetadata != null;
+      try {
+        doRefresh();
+      } catch (NoSuchViewException e) {
+        if (currentMetadataWasAvailable) {
+          LOGGER.warn(
+              "Could not find the view during refresh, setting current 
metadata to null", e);
+          shouldRefresh = true;
+        }
+
+        currentMetadata = null;
+        currentMetadataLocation = null;
+        version = -1;
+        throw e;
+      }
+
+      return current();
+    }
+
+    @Override
+    @SuppressWarnings("ImmutablesReferenceEquality")
+    public void commit(ViewMetadata base, ViewMetadata metadata) {
+      // if the metadata is already out of date, reject it
+      if (base != current()) {
+        if (base != null) {
+          throw new CommitFailedException("Cannot commit: stale view 
metadata");
+        } else {
+          // when current is non-null, the view exists. but when base is null, 
the commit is trying
+          // to create the view
+          throw new AlreadyExistsException("View already exists: %s", 
viewName());
+        }
+      }
+
+      // if the metadata is not changed, return early
+      if (base == metadata) {
+        LOGGER.info("Nothing to commit.");
+        return;
+      }
+
+      long start = System.currentTimeMillis();
+      doCommit(base, metadata);
+      requestRefresh();
+
+      LOGGER.info(
+          "Successfully committed to view {} in {} ms",
+          viewName(),
+          System.currentTimeMillis() - start);
+    }
+
     public void doRefresh() {
       PolarisResolvedPathWrapper resolvedEntities =
           resolvedEntityView.getPassthroughResolvedPath(
@@ -1492,7 +1695,6 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       }
     }
 
-    @Override
     public void doCommit(ViewMetadata base, ViewMetadata metadata) {
       // TODO: Maybe avoid writing metadata if there's definitely a 
transaction conflict
       LOGGER.debug("doCommit for view {} with base {}, metadata {}", 
identifier, base, metadata);
@@ -1548,7 +1750,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
               Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE));
 
       String newLocation = writeNewMetadataIfRequired(metadata);
-      String oldLocation = base == null ? null : currentMetadataLocation();
+      String oldLocation = base == null ? null : currentMetadataLocation;
 
       IcebergTableLikeEntity entity =
           IcebergTableLikeEntity.of(
@@ -1589,17 +1791,170 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       }
     }
 
-    @Override
+    protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
+      return null != metadata.metadataFileLocation()
+          ? metadata.metadataFileLocation()
+          : writeNewMetadata(metadata, version + 1);
+    }
+
+    private String writeNewMetadata(ViewMetadata metadata, int newVersion) {
+      String newMetadataFilePath = newMetadataFilePath(metadata, newVersion);
+      OutputFile newMetadataLocation = io().newOutputFile(newMetadataFilePath);
+
+      // write the new metadata
+      // use overwrite to avoid negative caching in S3. this is safe because 
the metadata location
+      // is
+      // always unique because it includes a UUID.
+      ViewMetadataParser.overwrite(metadata, newMetadataLocation);
+
+      return newMetadataLocation.location();
+    }
+
+    private String newMetadataFilePath(ViewMetadata metadata, int newVersion) {
+      String codecName =
+          metadata
+              .properties()
+              .getOrDefault(
+                  ViewProperties.METADATA_COMPRESSION, 
ViewProperties.METADATA_COMPRESSION_DEFAULT);
+      String fileExtension = TableMetadataParser.getFileExtension(codecName);
+      return metadataFileLocation(
+          metadata,
+          String.format(Locale.ROOT, "%05d-%s%s", newVersion, 
UUID.randomUUID(), fileExtension));
+    }
+
+    private String metadataFileLocation(ViewMetadata metadata, String 
filename) {
+      String metadataLocation = 
metadata.properties().get(ViewProperties.WRITE_METADATA_LOCATION);
+      if (metadataLocation != null) {
+        return String.format("%s/%s", 
LocationUtil.stripTrailingSlash(metadataLocation), filename);
+      } else {
+        return String.format(
+            "%s/%s/%s",
+            LocationUtil.stripTrailingSlash(metadata.location()), 
METADATA_FOLDER_NAME, filename);
+      }
+    }
+
     public FileIO io() {
       return viewFileIO;
     }
 
-    @Override
     protected String viewName() {
       return fullViewName;
     }
   }
 
+  /**
+   * An ABC for {@link BasePolarisTableOperations} and {@link 
BasePolarisViewOperations}. Much of
+   * this code was originally copied from {@link 
org.apache.iceberg.BaseMetastoreTableOperations}.
+   * CODE_COPIED_TO_POLARIS From Apache Iceberg Version: 1.8
+   */
+  private abstract static class PolarisOperationsBase<T> {
+
+    protected static final String METADATA_FOLDER_NAME = "metadata";
+
+    protected T currentMetadata = null;
+    protected String currentMetadataLocation = null;
+    protected boolean shouldRefresh = true;
+    protected int version = -1;
+
+    protected void requestRefresh() {
+      this.shouldRefresh = true;
+    }
+
+    protected void disableRefresh() {
+      this.shouldRefresh = false;
+    }
+
+    /**
+     * Parse the version from table/view metadata file name.
+     *
+     * @param metadataLocation table/view metadata file location
+     * @return version of the table/view metadata file in success case and -1 
if the version is not
+     *     parsable (as a sign that the metadata is not part of this catalog)
+     */
+    protected int parseVersion(String metadataLocation) {
+      int versionStart =
+          metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this 
will be 0
+      int versionEnd = metadataLocation.indexOf('-', versionStart);
+      if (versionEnd < 0) {
+        // found filesystem object's metadata
+        return -1;
+      }
+
+      try {
+        return Integer.parseInt(metadataLocation.substring(versionStart, 
versionEnd));
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Unable to parse version from metadata location: {}", 
metadataLocation, e);
+        return -1;
+      }
+    }
+
+    protected void refreshFromMetadataLocation(
+        String newLocation,
+        Predicate<Exception> shouldRetry,
+        int numRetries,
+        Function<String, T> metadataLoader) {
+      // use null-safe equality check because new tables have a null metadata 
location
+      if (!Objects.equal(currentMetadataLocation, newLocation)) {
+        LOGGER.info("Refreshing table metadata from new version: {}", 
newLocation);
+
+        AtomicReference<T> newMetadata = new AtomicReference<>();
+        Tasks.foreach(newLocation)
+            .retry(numRetries)
+            .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... 
*/)
+            .throwFailureWhenFinished()
+            .stopRetryOn(NotFoundException.class) // overridden if shouldRetry 
is non-null
+            .shouldRetryTest(shouldRetry)
+            .run(metadataLocation -> 
newMetadata.set(metadataLoader.apply(metadataLocation)));
+
+        if (newMetadata.get() instanceof TableMetadata tableMetadata) {
+          if (currentMetadata instanceof TableMetadata currentTableMetadata) {
+            String newUUID = tableMetadata.uuid();
+            if (currentMetadata != null && currentTableMetadata.uuid() != null 
&& newUUID != null) {
+              Preconditions.checkState(
+                  newUUID.equals(currentTableMetadata.uuid()),
+                  "Table UUID does not match: current=%s != refreshed=%s",
+                  currentTableMetadata.uuid(),
+                  newUUID);
+            }
+          }
+        }
+
+        this.currentMetadata = newMetadata.get();
+        this.currentMetadataLocation = newLocation;
+        this.version = parseVersion(newLocation);
+      }
+      this.shouldRefresh = false;
+    }
+  }
+
+  private void validateMetadataFileInTableDir(
+      TableIdentifier identifier, TableMetadata metadata, CatalogEntity 
catalog) {
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    boolean allowEscape =
+        polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION);
+    if (!allowEscape
+        && !polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ALLOW_EXTERNAL_METADATA_FILE_LOCATION)) {
+      LOGGER.debug(
+          "Validating base location {} for table {} in metadata file {}",
+          metadata.location(),
+          identifier,
+          metadata.metadataFileLocation());
+      StorageLocation metadataFileLocation = 
StorageLocation.of(metadata.metadataFileLocation());
+      StorageLocation baseLocation = StorageLocation.of(metadata.location());
+      if (!metadataFileLocation.isChildOf(baseLocation)) {
+        throw new BadRequestException(
+            "Metadata location %s is not allowed outside of table location %s",
+            metadata.metadataFileLocation(), metadata.location());
+      }
+    }
+  }
+
   private FileIO loadFileIOForTableLike(
       TableIdentifier identifier,
       Set<String> readLocations,

Reply via email to