This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 0d72b03cb Refactor BasePolarisTableOperations &
BasePolarisViewOperations (#1426)
0d72b03cb is described below
commit 0d72b03cb182f33baebb2164346a9118d5451a29
Author: Eric Maynard <[email protected]>
AuthorDate: Thu Apr 24 21:34:28 2025 -0700
Refactor BasePolarisTableOperations & BasePolarisViewOperations (#1426)
* initial copy paste
* Reorder
* view copy paste
* fixes, polish
* stable
* yank
* CODE_COPIED_TO_POLARIS comments
* autolint
* update license
* typofix
* update comments
* autolint
---
LICENSE | 1 +
.../service/catalog/iceberg/IcebergCatalog.java | 437 +++++++++++++++++++--
2 files changed, 397 insertions(+), 41 deletions(-)
diff --git a/LICENSE b/LICENSE
index e9c023b30..7b965b4a5 100644
--- a/LICENSE
+++ b/LICENSE
@@ -218,6 +218,7 @@ This product includes code from Apache Iceberg.
* spec/iceberg-rest-catalog-open-api.yaml
* spec/polaris-catalog-apis/oauth-tokens-api.yaml
*
integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
+*
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
Copyright: Copyright 2017-2025 The Apache Software Foundation
Home page: https://iceberg.apache.org
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index f80d4077a..ae34be928 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -35,25 +35,32 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.exceptions.CommitFailedException;
@@ -68,13 +75,17 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
-import org.apache.iceberg.view.BaseViewOperations;
import org.apache.iceberg.view.ViewBuilder;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewMetadataParser;
import org.apache.iceberg.view.ViewOperations;
+import org.apache.iceberg.view.ViewProperties;
import org.apache.iceberg.view.ViewUtil;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
@@ -1190,7 +1201,14 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
}
- private class BasePolarisTableOperations extends
BaseMetastoreTableOperations {
+ /**
+ * An implementation of {@link TableOperations} that integrates with {@link
IcebergCatalog}. Much
+ * of this code was originally copied from {@link
+ * org.apache.iceberg.BaseMetastoreTableOperations}. CODE_COPIED_TO_POLARIS
From Apache Iceberg
+ * Version: 1.8
+ */
+ private class BasePolarisTableOperations extends
PolarisOperationsBase<TableMetadata>
+ implements TableOperations {
private final TableIdentifier tableIdentifier;
private final String fullTableName;
private FileIO tableFileIO;
@@ -1203,6 +1221,77 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
@Override
+ public TableMetadata current() {
+ if (shouldRefresh) {
+ return refresh();
+ }
+ return currentMetadata;
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ boolean currentMetadataWasAvailable = currentMetadata != null;
+ try {
+ doRefresh();
+ } catch (NoSuchTableException e) {
+ if (currentMetadataWasAvailable) {
+ LOGGER.warn(
+ "Could not find the table during refresh, setting current
metadata to null", e);
+ shouldRefresh = true;
+ }
+
+ currentMetadata = null;
+ currentMetadataLocation = null;
+ version = -1;
+ throw e;
+ }
+ return current();
+ }
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata metadata) {
+ // if the metadata is already out of date, reject it
+ if (base != current()) {
+ if (base != null) {
+ throw new CommitFailedException("Cannot commit: stale table
metadata");
+ } else {
+ // when current is non-null, the table exists. but when base is
null, the commit is trying
+ // to create the table
+ throw new AlreadyExistsException("Table already exists: %s",
fullTableName);
+ }
+ }
+ // if the metadata is not changed, return early
+ if (base == metadata) {
+ LOGGER.info("Nothing to commit.");
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ doCommit(base, metadata);
+ CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);
+ requestRefresh();
+
+ LOGGER.info(
+ "Successfully committed to table {} in {} ms",
+ fullTableName,
+ System.currentTimeMillis() - start);
+ }
+
+ @Override
+ public FileIO io() {
+ return tableFileIO;
+ }
+
+ @Override
+ public String metadataFileLocation(String filename) {
+ return metadataFileLocation(current(), filename);
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ return LocationProviders.locationsFor(current().location(),
current().properties());
+ }
+
public void doRefresh() {
LOGGER.debug("doRefresh for tableIdentifier {}", tableIdentifier);
// While doing refresh/commit protocols, we must fetch the fresh
"passthrough" resolved
@@ -1250,7 +1339,6 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
}
- @Override
public void doCommit(TableMetadata base, TableMetadata metadata) {
LOGGER.debug(
"doCommit for table {} with base {}, metadata {}", tableIdentifier,
base, metadata);
@@ -1376,11 +1464,11 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
if (!Objects.equal(existingLocation, oldLocation)) {
if (null == base) {
- throw new AlreadyExistsException("Table already exists: %s",
tableName());
+ throw new AlreadyExistsException("Table already exists: %s",
fullTableName);
}
if (null == existingLocation) {
- throw new NoSuchTableException("Table does not exist: %s",
tableName());
+ throw new NoSuchTableException("Table does not exist: %s",
fullTableName);
}
throw new CommitFailedException(
@@ -1396,45 +1484,101 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
@Override
- public FileIO io() {
- return tableFileIO;
+ public TableOperations temp(TableMetadata uncommittedMetadata) {
+ return new TableOperations() {
+ @Override
+ public TableMetadata current() {
+ return uncommittedMetadata;
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ throw new UnsupportedOperationException(
+ "Cannot call refresh on temporary table operations");
+ }
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata metadata) {
+ throw new UnsupportedOperationException(
+ "Cannot call commit on temporary table operations");
+ }
+
+ @Override
+ public String metadataFileLocation(String fileName) {
+ return BasePolarisTableOperations.this.metadataFileLocation(
+ uncommittedMetadata, fileName);
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ return LocationProviders.locationsFor(
+ uncommittedMetadata.location(),
uncommittedMetadata.properties());
+ }
+
+ @Override
+ public FileIO io() {
+ return BasePolarisTableOperations.this.io();
+ }
+
+ @Override
+ public EncryptionManager encryption() {
+ return BasePolarisTableOperations.this.encryption();
+ }
+
+ @Override
+ public long newSnapshotId() {
+ return BasePolarisTableOperations.this.newSnapshotId();
+ }
+ };
}
- @Override
- protected String tableName() {
- return fullTableName;
+ protected String writeNewMetadataIfRequired(boolean newTable,
TableMetadata metadata) {
+ return newTable && metadata.metadataFileLocation() != null
+ ? metadata.metadataFileLocation()
+ : writeNewMetadata(metadata, version + 1);
}
- }
- private void validateMetadataFileInTableDir(
- TableIdentifier identifier, TableMetadata metadata, CatalogEntity
catalog) {
- PolarisCallContext polarisCallContext =
callContext.getPolarisCallContext();
- boolean allowEscape =
- polarisCallContext
- .getConfigurationStore()
- .getConfiguration(
- polarisCallContext,
FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION);
- if (!allowEscape
- && !polarisCallContext
- .getConfigurationStore()
- .getConfiguration(
- polarisCallContext,
FeatureConfiguration.ALLOW_EXTERNAL_METADATA_FILE_LOCATION)) {
- LOGGER.debug(
- "Validating base location {} for table {} in metadata file {}",
- metadata.location(),
- identifier,
- metadata.metadataFileLocation());
- StorageLocation metadataFileLocation =
StorageLocation.of(metadata.metadataFileLocation());
- StorageLocation baseLocation = StorageLocation.of(metadata.location());
- if (!metadataFileLocation.isChildOf(baseLocation)) {
- throw new BadRequestException(
- "Metadata location %s is not allowed outside of table location %s",
- metadata.metadataFileLocation(), metadata.location());
+ protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
+ String newTableMetadataFilePath = newTableMetadataFilePath(metadata,
newVersion);
+ OutputFile newMetadataLocation =
io().newOutputFile(newTableMetadataFilePath);
+
+ // write the new metadata
+ // use overwrite to avoid negative caching in S3. this is safe because
the metadata location
+ // is
+ // always unique because it includes a UUID.
+ TableMetadataParser.overwrite(metadata, newMetadataLocation);
+
+ return newMetadataLocation.location();
+ }
+
+ private String metadataFileLocation(TableMetadata metadata, String
filename) {
+ String metadataLocation =
metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);
+
+ if (metadataLocation != null) {
+ return String.format("%s/%s",
LocationUtil.stripTrailingSlash(metadataLocation), filename);
+ } else {
+ return String.format("%s/%s/%s", metadata.location(),
METADATA_FOLDER_NAME, filename);
}
}
+
+ private String newTableMetadataFilePath(TableMetadata meta, int
newVersion) {
+ String codecName =
+ meta.property(
+ TableProperties.METADATA_COMPRESSION,
TableProperties.METADATA_COMPRESSION_DEFAULT);
+ String fileExtension = TableMetadataParser.getFileExtension(codecName);
+ return metadataFileLocation(
+ meta,
+ String.format(Locale.ROOT, "%05d-%s%s", newVersion,
UUID.randomUUID(), fileExtension));
+ }
}
- private class BasePolarisViewOperations extends BaseViewOperations {
+ /**
+ * An implementation of {@link ViewOperations} that integrates with {@link
IcebergCatalog}. Much
+ * of this code was originally copied from {@link
org.apache.iceberg.view.BaseViewOperations}.
+ * CODE_COPIED_TO_POLARIS From Apache Iceberg Version: 1.8
+ */
+ private class BasePolarisViewOperations extends
PolarisOperationsBase<ViewMetadata>
+ implements ViewOperations {
private final TableIdentifier identifier;
private final String fullViewName;
private FileIO viewFileIO;
@@ -1446,6 +1590,65 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
@Override
+ public ViewMetadata current() {
+ if (shouldRefresh) {
+ return refresh();
+ }
+
+ return currentMetadata;
+ }
+
+ @Override
+ public ViewMetadata refresh() {
+ boolean currentMetadataWasAvailable = currentMetadata != null;
+ try {
+ doRefresh();
+ } catch (NoSuchViewException e) {
+ if (currentMetadataWasAvailable) {
+ LOGGER.warn(
+ "Could not find the view during refresh, setting current
metadata to null", e);
+ shouldRefresh = true;
+ }
+
+ currentMetadata = null;
+ currentMetadataLocation = null;
+ version = -1;
+ throw e;
+ }
+
+ return current();
+ }
+
+ @Override
+ @SuppressWarnings("ImmutablesReferenceEquality")
+ public void commit(ViewMetadata base, ViewMetadata metadata) {
+ // if the metadata is already out of date, reject it
+ if (base != current()) {
+ if (base != null) {
+ throw new CommitFailedException("Cannot commit: stale view
metadata");
+ } else {
+ // when current is non-null, the view exists. but when base is null,
the commit is trying
+ // to create the view
+ throw new AlreadyExistsException("View already exists: %s",
viewName());
+ }
+ }
+
+ // if the metadata is not changed, return early
+ if (base == metadata) {
+ LOGGER.info("Nothing to commit.");
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ doCommit(base, metadata);
+ requestRefresh();
+
+ LOGGER.info(
+ "Successfully committed to view {} in {} ms",
+ viewName(),
+ System.currentTimeMillis() - start);
+ }
+
public void doRefresh() {
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(
@@ -1492,7 +1695,6 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
}
- @Override
public void doCommit(ViewMetadata base, ViewMetadata metadata) {
// TODO: Maybe avoid writing metadata if there's definitely a
transaction conflict
LOGGER.debug("doCommit for view {} with base {}, metadata {}",
identifier, base, metadata);
@@ -1548,7 +1750,7 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE));
String newLocation = writeNewMetadataIfRequired(metadata);
- String oldLocation = base == null ? null : currentMetadataLocation();
+ String oldLocation = base == null ? null : currentMetadataLocation;
IcebergTableLikeEntity entity =
IcebergTableLikeEntity.of(
@@ -1589,17 +1791,170 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
}
- @Override
+ protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
+ return null != metadata.metadataFileLocation()
+ ? metadata.metadataFileLocation()
+ : writeNewMetadata(metadata, version + 1);
+ }
+
+ private String writeNewMetadata(ViewMetadata metadata, int newVersion) {
+ String newMetadataFilePath = newMetadataFilePath(metadata, newVersion);
+ OutputFile newMetadataLocation = io().newOutputFile(newMetadataFilePath);
+
+ // write the new metadata
+ // use overwrite to avoid negative caching in S3. this is safe because
the metadata location
+ // is
+ // always unique because it includes a UUID.
+ ViewMetadataParser.overwrite(metadata, newMetadataLocation);
+
+ return newMetadataLocation.location();
+ }
+
+ private String newMetadataFilePath(ViewMetadata metadata, int newVersion) {
+ String codecName =
+ metadata
+ .properties()
+ .getOrDefault(
+ ViewProperties.METADATA_COMPRESSION,
ViewProperties.METADATA_COMPRESSION_DEFAULT);
+ String fileExtension = TableMetadataParser.getFileExtension(codecName);
+ return metadataFileLocation(
+ metadata,
+ String.format(Locale.ROOT, "%05d-%s%s", newVersion,
UUID.randomUUID(), fileExtension));
+ }
+
+ private String metadataFileLocation(ViewMetadata metadata, String
filename) {
+ String metadataLocation =
metadata.properties().get(ViewProperties.WRITE_METADATA_LOCATION);
+ if (metadataLocation != null) {
+ return String.format("%s/%s",
LocationUtil.stripTrailingSlash(metadataLocation), filename);
+ } else {
+ return String.format(
+ "%s/%s/%s",
+ LocationUtil.stripTrailingSlash(metadata.location()),
METADATA_FOLDER_NAME, filename);
+ }
+ }
+
public FileIO io() {
return viewFileIO;
}
- @Override
protected String viewName() {
return fullViewName;
}
}
+ /**
+ * An ABC for {@link BasePolarisTableOperations} and {@link
BasePolarisViewOperations}. Much of
+ * this code was originally copied from {@link
org.apache.iceberg.BaseMetastoreTableOperations}.
+ * CODE_COPIED_TO_POLARIS From Apache Iceberg Version: 1.8
+ */
+ private abstract static class PolarisOperationsBase<T> {
+
+ protected static final String METADATA_FOLDER_NAME = "metadata";
+
+ protected T currentMetadata = null;
+ protected String currentMetadataLocation = null;
+ protected boolean shouldRefresh = true;
+ protected int version = -1;
+
+ protected void requestRefresh() {
+ this.shouldRefresh = true;
+ }
+
+ protected void disableRefresh() {
+ this.shouldRefresh = false;
+ }
+
+ /**
+ * Parse the version from table/view metadata file name.
+ *
+ * @param metadataLocation table/view metadata file location
+ * @return version of the table/view metadata file in success case and -1
if the version is not
+ * parsable (as a sign that the metadata is not part of this catalog)
+ */
+ protected int parseVersion(String metadataLocation) {
+ int versionStart =
+ metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this
will be 0
+ int versionEnd = metadataLocation.indexOf('-', versionStart);
+ if (versionEnd < 0) {
+ // found filesystem object's metadata
+ return -1;
+ }
+
+ try {
+ return Integer.parseInt(metadataLocation.substring(versionStart,
versionEnd));
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Unable to parse version from metadata location: {}",
metadataLocation, e);
+ return -1;
+ }
+ }
+
+ protected void refreshFromMetadataLocation(
+ String newLocation,
+ Predicate<Exception> shouldRetry,
+ int numRetries,
+ Function<String, T> metadataLoader) {
+ // use null-safe equality check because new tables have a null metadata
location
+ if (!Objects.equal(currentMetadataLocation, newLocation)) {
+ LOGGER.info("Refreshing table metadata from new version: {}",
newLocation);
+
+ AtomicReference<T> newMetadata = new AtomicReference<>();
+ Tasks.foreach(newLocation)
+ .retry(numRetries)
+ .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ...
*/)
+ .throwFailureWhenFinished()
+ .stopRetryOn(NotFoundException.class) // overridden if shouldRetry
is non-null
+ .shouldRetryTest(shouldRetry)
+ .run(metadataLocation ->
newMetadata.set(metadataLoader.apply(metadataLocation)));
+
+ if (newMetadata.get() instanceof TableMetadata tableMetadata) {
+ if (currentMetadata instanceof TableMetadata currentTableMetadata) {
+ String newUUID = tableMetadata.uuid();
+ if (currentMetadata != null && currentTableMetadata.uuid() != null
&& newUUID != null) {
+ Preconditions.checkState(
+ newUUID.equals(currentTableMetadata.uuid()),
+ "Table UUID does not match: current=%s != refreshed=%s",
+ currentTableMetadata.uuid(),
+ newUUID);
+ }
+ }
+ }
+
+ this.currentMetadata = newMetadata.get();
+ this.currentMetadataLocation = newLocation;
+ this.version = parseVersion(newLocation);
+ }
+ this.shouldRefresh = false;
+ }
+ }
+
+ private void validateMetadataFileInTableDir(
+ TableIdentifier identifier, TableMetadata metadata, CatalogEntity
catalog) {
+ PolarisCallContext polarisCallContext =
callContext.getPolarisCallContext();
+ boolean allowEscape =
+ polarisCallContext
+ .getConfigurationStore()
+ .getConfiguration(
+ polarisCallContext,
FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION);
+ if (!allowEscape
+ && !polarisCallContext
+ .getConfigurationStore()
+ .getConfiguration(
+ polarisCallContext,
FeatureConfiguration.ALLOW_EXTERNAL_METADATA_FILE_LOCATION)) {
+ LOGGER.debug(
+ "Validating base location {} for table {} in metadata file {}",
+ metadata.location(),
+ identifier,
+ metadata.metadataFileLocation());
+ StorageLocation metadataFileLocation =
StorageLocation.of(metadata.metadataFileLocation());
+ StorageLocation baseLocation = StorageLocation.of(metadata.location());
+ if (!metadataFileLocation.isChildOf(baseLocation)) {
+ throw new BadRequestException(
+ "Metadata location %s is not allowed outside of table location %s",
+ metadata.metadataFileLocation(), metadata.location());
+ }
+ }
+ }
+
private FileIO loadFileIOForTableLike(
TableIdentifier identifier,
Set<String> readLocations,