adam-christian-software commented on code in PR #3256:
URL: https://github.com/apache/polaris/pull/3256#discussion_r2614589178


##########
storage/files/README.md:
##########
@@ -0,0 +1,70 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Polaris object store operations
+
+API and implementations to perform long-running operations against object 
stores, mostly to purge files.

Review Comment:
   Nit: I wouldn't limit to purging files. I could see other object store 
operations which might be helpful in the future.
   
   So, I'd remove "mostly to purge files."



##########
storage/files/README.md:
##########
@@ -0,0 +1,70 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Polaris object store operations
+
+API and implementations to perform long-running operations against object 
stores, mostly to purge files.
+
+Functionalities to scan an object store and to purge files are separated. 
Filter mechanisms are used to
+select the files to be deleted (purged).
+
+There are implementations to identify the files referenced by a particular 
Iceberg table or view metadata, including
+statistics files, manifest lists of all snapshots, the manifest files and the 
data/delete files.
+
+The file operations perform no effort to identify duplicates during the 
identification of files referenced by
+a table or view metadata.
+This means that, for example, a data file referenced in multiple manifest 
files will be returned twice.
+
+Purge operations are performed in one or multiple bulk delete operations.
+The implementation takes care of not including the same file more than once 
within a single bulk delete operation.
+
+One alternative implementation purges all files within the base location of a 
table or view metadata.

Review Comment:
   Is this alternative implementation inside of the implementation here?



##########
storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.impl;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.Streams;
+import com.google.common.util.concurrent.RateLimiter;
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewMetadataParser;
+import org.apache.polaris.storage.files.api.FileFilter;
+import org.apache.polaris.storage.files.api.FileOperations;
+import org.apache.polaris.storage.files.api.FileSpec;
+import org.apache.polaris.storage.files.api.FileType;
+import org.apache.polaris.storage.files.api.ImmutablePurgeStats;
+import org.apache.polaris.storage.files.api.PurgeSpec;
+import org.apache.polaris.storage.files.api.PurgeStats;
+import org.projectnessie.storage.uri.StorageUri;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param fileIO the {@link FileIO} instance to use. The given instance must 
implement both {@link
+ *     org.apache.iceberg.io.SupportsBulkOperations} and {@link
+ *     org.apache.iceberg.io.SupportsPrefixOperations}.
+ */
+record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileOperationsImpl.class);
+
+  @Override
+  public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull 
FileFilter filter) {
+    var prefixUri = StorageUri.of(prefix).resolve("/");
+    if (fileIO instanceof SupportsPrefixOperations prefixOps) {
+      return Streams.stream(prefixOps.listPrefix(prefix).iterator())
+          .filter(Objects::nonNull)
+          .map(
+              fileInfo -> {
+                var location = StorageUri.of(fileInfo.location());
+                if (!location.isAbsolute()) {
+                  // ADLSFileIO does _not_ include the prefix, but GCSFileIO 
and S3FileIO do.
+                  location = prefixUri.resolve(location);
+                }
+                return FileSpec.builder()
+                    .location(location.toString())
+                    .size(fileInfo.size())
+                    .createdAtMillis(fileInfo.createdAtMillis())
+                    .build();
+              })
+          .filter(filter);
+    }
+
+    throw new IllegalStateException(
+        format(
+            "An Iceberg FileIO supporting prefix operations is required, but 
the given %s does not",
+            fileIO.getClass().getName()));
+  }
+
+  @Override
+  public Stream<FileSpec> identifyIcebergTableFiles(
+      @Nonnull String tableMetadataLocation, boolean deduplicate) {
+    var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation);
+    if (metadataOpt.isEmpty()) {
+      return Stream.empty();
+    }
+    var metadata = metadataOpt.get();
+
+    var metadataFileSpec =
+        
FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build();
+
+    var metadataFiles = Stream.of(metadataFileSpec);
+
+    var statisticsFiles = metadata.statisticsFiles();
+    if (statisticsFiles != null) {
+      var statisticsFileSpecs =
+          statisticsFiles.stream()
+              .map(
+                  statisticsFile ->
+                      FileSpec.fromLocationAndSize(
+                              statisticsFile.path(), 
statisticsFile.fileSizeInBytes())
+                          .fileType(FileType.ICEBERG_STATISTICS)
+                          .build());
+      metadataFiles = Stream.concat(statisticsFileSpecs, metadataFiles);
+    }
+
+    var previousFiles = metadata.previousFiles();
+    if (previousFiles != null) {
+      metadataFiles =
+          Stream.concat(
+              metadataFiles,
+              previousFiles.stream()
+                  .filter(
+                      metadataLogEntry ->
+                          metadataLogEntry.file() != null && 
!metadataLogEntry.file().isEmpty())
+                  .map(
+                      metadataLogEntry ->
+                          FileSpec.fromLocation(metadataLogEntry.file())
+                              .fileType(FileType.ICEBERG_METADATA)
+                              .build()));
+    }
+
+    var specsById = metadata.specsById();
+
+    var addPredicate = deduplicator(deduplicate);
+
+    var manifestsAndDataFiles =
+        metadata.snapshots().stream()
+            // Newest snapshots first
+            .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), 
s1.timestampMillis()))
+            .flatMap(
+                snapshot -> identifyIcebergTableSnapshotFiles(snapshot, 
specsById, addPredicate));
+
+    // Return "dependencies" before the "metadata" itself, so the probability 
of being able to
+    // resume a failed/aborted purge is higher.
+    return Stream.concat(manifestsAndDataFiles, metadataFiles);
+  }
+
+  static Predicate<String> deduplicator(boolean deduplicate) {
+    if (!deduplicate) {
+      return x -> true;
+    }
+    var set = new LinkedHashSet<String>();
+    return location -> {
+      synchronized (set) {
+        if (set.size() > 100_000) {
+          // limit the heap pressure of the deduplication set to 100,000 
elements
+          set.removeFirst();
+        }
+        return set.add(location);
+      }
+    };
+  }
+
+  Stream<FileSpec> identifyIcebergTableSnapshotFiles(
+      @Nonnull Snapshot snapshot,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+    var manifestListLocation = snapshot.manifestListLocation();
+    if (manifestListLocation != null && 
!addPredicate.test(manifestListLocation)) {
+      return Stream.empty();
+    }
+
+    return identifyIcebergManifests(manifestListLocation, snapshot, specsById, 
addPredicate);
+  }
+
+  Stream<FileSpec> identifyIcebergManifests(
+      String manifestListLocation,
+      Snapshot snapshot,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+
+    var manifestListFileSpecStream = Stream.<FileSpec>empty();
+
+    if (manifestListLocation != null && !manifestListLocation.isEmpty()) {
+      var manifestListFileSpec =
+          FileSpec.fromLocation(manifestListLocation)
+              .fileType(FileType.ICEBERG_MANIFEST_LIST)
+              .build();
+      manifestListFileSpecStream = Stream.of(manifestListFileSpec);
+    }
+
+    try {
+      var allManifestsFiles =
+          snapshot.allManifests(fileIO).stream()
+              .filter(manifestFile -> addPredicate.test(manifestFile.path()))
+              .flatMap(
+                  manifestFile ->
+                      identifyIcebergManifestDataFiles(manifestFile, 
specsById, addPredicate));
+
+      // Return "dependencies" before the "metadata" itself, so a 
failed/aborted purge can be
+      // resumed.
+      return Stream.concat(allManifestsFiles, manifestListFileSpecStream);
+    } catch (Exception e) {
+      LOGGER.warn("Failure reading manifest list file {}: {}", 
manifestListLocation, e.toString());
+      LOGGER.debug("Failure reading manifest list file {}", 
manifestListLocation);
+      return manifestListFileSpecStream;
+    }
+  }
+
+  @SuppressWarnings("UnnecessaryDefault")
+  private Stream<FileSpec> identifyIcebergManifestDataFiles(
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+
+    var manifestFileSpec =
+        FileSpec.fromLocationAndSize(manifestFile.path(), 
manifestFile.length())
+            .fileType(FileType.ICEBERG_MANIFEST_FILE)
+            .build();
+
+    try (var contentFilesIter =
+        switch (manifestFile.content()) {
+          case DATA -> ManifestFiles.read(manifestFile, fileIO).iterator();
+          case DELETES ->
+              ManifestFiles.readDeleteManifest(manifestFile, fileIO, 
specsById).iterator();
+          default -> {
+            LOGGER.warn(
+                "Unsupported content type {} in manifest {}",
+                manifestFile.content(),
+                manifestFile.path());
+            yield CloseableIterator.<ContentFile<? extends 
ContentFile<?>>>empty();
+          }
+        }) {
+
+      // Cannot leverage streaming here and eagerly build a list, as the 
manifest-file reader needs
+      // to be closed.
+      var files = new ArrayList<FileSpec>();
+      while (contentFilesIter.hasNext()) {
+        var contentFile = contentFilesIter.next();
+        if (addPredicate.test(contentFile.location())) {
+          files.add(
+              FileSpec.fromLocationAndSize(contentFile.location(), 
contentFile.fileSizeInBytes())
+                  .fileType(FileType.fromContentFile(contentFile))
+                  .build());
+        }
+      }
+      // Return "dependencies" before the "metadata" itself, so the 
probability of being able to
+      // resume a failed/aborted purge is higher.
+      files.add(manifestFileSpec);
+
+      return files.stream();
+    } catch (IOException e) {
+      LOGGER.warn("Failure reading manifest file {}: {}", manifestFile.path(), 
e.toString());
+      LOGGER.debug("Failure reading manifest file {}", manifestFile.path(), e);
+      return Stream.of(manifestFileSpec);
+    }
+  }
+
+  @Override
+  public Stream<FileSpec> identifyIcebergViewFiles(
+      @Nonnull String viewMetadataLocation, boolean deduplicate) {

Review Comment:
   We aren't using deduplicate here. Should we be?



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import java.time.Duration;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+@PolarisImmutable
+public interface PurgeStats {
+  Duration duration();
+
+  /**
+   * The number of purged files.
+   *
+   * <p>The returned value may be wrong and include non-existing files.

Review Comment:
   Same comment for failedPurges. Is there a way that users can understand this 
number better? If it is wrong, then why is it helpful to return? 



##########
storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsFactoryImpl.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.impl;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.storage.files.api.FileOperations;
+import org.apache.polaris.storage.files.api.FileOperationsFactory;
+
+/** CDI application-scoped implementation of {@link FileOperationsFactory}. */
+@ApplicationScoped
+class FileOperationsFactoryImpl implements FileOperationsFactory {
+
+  private final Clock clock;

Review Comment:
   Do we use this anywhere?



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import jakarta.annotation.Nonnull;
+import java.util.stream.Stream;
+
+/**
+ * Object storage file operations, used to find files below a given prefix, to 
purge files, to
+ * identify referenced files, etc.
+ *
+ * <p>All functions of this interface rather yield incomplete results and 
continue over throwing
+ * exceptions.
+ */
+public interface FileOperations {
+  /**
+   * Find files that match the given prefix and filter.
+   *
+   * <p>Whether existing but inaccessible files are included in the result 
depends on the object
+   * store.
+   *
+   * <p>Call sites should consider rate-limiting the scan operations, for 
example, by using Guava's
+   * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); 
return x; }} step on
+   * the returned stream.
+   *
+   * @param prefix full object storage URI prefix, including scheme and bucket.
+   * @param filter file filter
+   * @return a stream of file specs with the {@link 
FileSpec#createdAtMillis()} and {@link
+   *     FileSpec#size()} attributes populated with the information provided 
by the object store.
+   *     The {@link FileSpec#fileType() file type} attribute is not populated, 
it may be {@link
+   *     FileSpec#guessTypeFromName() guessed}.
+   */
+  Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter 
filter);
+
+  /**
+   * Identifies all files referenced by the given table-metadata.
+   *
+   * <p>In case "container" files, like the metadata, manifest-list or 
manifest files, are not
+   * readable, the returned stream will just not include those.
+   *
+   * <p>Rate-limiting the returned stream is recommended when identifying 
multiple tables and/or
+   * views. Rate-limiting on a single invocation may not be effective as 
expected.
+   *
+   * @param tableMetadataLocation Iceberg table-metadata location
+   * @param deduplicate if true, attempt to deduplicate files by their 
location, adding additional
+   *     heap pressure to the operation. Implementations may ignore this 
parameter or may not

Review Comment:
   Is there a reason to have the implementation ignore this parameter? Like, it 
seems as if it would be helpful because, if it is ignored sometimes, the caller 
will always have to assume that it is not deduplicated. So, the caller might be 
doing unnecessary work.



##########
storage/files/README.md:
##########
@@ -0,0 +1,70 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Polaris object store operations
+
+API and implementations to perform long-running operations against object 
stores, mostly to purge files.
+
+Functionalities to scan an object store and to purge files are separated. 
Filter mechanisms are used to
+select the files to be deleted (purged).
+
+There are implementations to identify the files referenced by a particular 
Iceberg table or view metadata, including
+statistics files, manifest lists of all snapshots, the manifest files and the 
data/delete files.
+
+The file operations perform no effort to identify duplicates during the 
identification of files referenced by
+a table or view metadata.
+This means that, for example, a data file referenced in multiple manifest 
files will be returned twice.
+
+Purge operations are performed in one or multiple bulk delete operations.
+The implementation takes care of not including the same file more than once 
within a single bulk delete operation.
+
+One alternative implementation purges all files within the base location of a 
table or view metadata.
+
+All implemented operations are designed to be resilient against failures as 
those are expected to be run as
+maintenance operations or as part of such.
+The operations are implemented to continue in case of errors and eventually 
succeed instead of failing eagerly.
+Maintenance operations are usually not actively observed, and manually fixing 
consistency issues in object
+stores is not a straightforward task for users.
+
+# Potential future enhancements
+
+The operations provided by `FileOperations` are meant for maintenance 
operations, which are not
+time- or performance-critical.
+It is more important that the operations are resilient against failures, do 
not add unnecessary CPU or heap pressure
+and eventually succeed.
+Further, maintenance operations should not eat up too much I/O bandwidth to 
not interfere with other user-facing
+operations.
+
+Depending on the overall load of the system, it might be worth running some 
operations in parallel.
+
+# Code architecture
+
+The code is split in two modules. One for the (Polaris internal) API 
interfaces and one for the implementations.
+
+Tests against various object store implementations are included as unit tests 
using an on-heap object-store-mock
+and as integration tests against test containers for S3, GCS and ADLS.
+The object-store-mock used in unit tests is also used to validate the low 
heap-pressure required by the
+implementations. 
+
+The actual object store interaction of the current implementation is delegated 
to Iceberg `FileIO` implementations.
+Only `FileIO` implementations that support prefix-operations 
(`SupportsPrefixOperations` interface) and

Review Comment:
   What sort of FileIO implementations are not supported then? From what I'm 
seeing OSSFileIO, InMemoryFileIO, & EcsFileIO?



##########
storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.impl;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.Streams;
+import com.google.common.util.concurrent.RateLimiter;
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewMetadataParser;
+import org.apache.polaris.storage.files.api.FileFilter;
+import org.apache.polaris.storage.files.api.FileOperations;
+import org.apache.polaris.storage.files.api.FileSpec;
+import org.apache.polaris.storage.files.api.FileType;
+import org.apache.polaris.storage.files.api.ImmutablePurgeStats;
+import org.apache.polaris.storage.files.api.PurgeSpec;
+import org.apache.polaris.storage.files.api.PurgeStats;
+import org.projectnessie.storage.uri.StorageUri;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param fileIO the {@link FileIO} instance to use. The given instance must 
implement both {@link
+ *     org.apache.iceberg.io.SupportsBulkOperations} and {@link
+ *     org.apache.iceberg.io.SupportsPrefixOperations}.
+ */
+record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations {

Review Comment:
   Should we do a validation check here to ensure that the FileIO implements 
both SupportsBulkOperations & SupportsPrefixOperations?



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import jakarta.annotation.Nonnull;
+import java.util.stream.Stream;
+
+/**
+ * Object storage file operations, used to find files below a given prefix, to 
purge files, to
+ * identify referenced files, etc.
+ *
+ * <p>All functions of this interface rather yield incomplete results and 
continue over throwing
+ * exceptions.
+ */
+public interface FileOperations {
+  /**
+   * Find files that match the given prefix and filter.
+   *
+   * <p>Whether existing but inaccessible files are included in the result 
depends on the object
+   * store.
+   *
+   * <p>Call sites should consider rate-limiting the scan operations, for 
example, by using Guava's
+   * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); 
return x; }} step on
+   * the returned stream.
+   *
+   * @param prefix full object storage URI prefix, including scheme and bucket.
+   * @param filter file filter
+   * @return a stream of file specs with the {@link 
FileSpec#createdAtMillis()} and {@link
+   *     FileSpec#size()} attributes populated with the information provided 
by the object store.
+   *     The {@link FileSpec#fileType() file type} attribute is not populated, 
it may be {@link
+   *     FileSpec#guessTypeFromName() guessed}.
+   */
+  Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter 
filter);
+
+  /**
+   * Identifies all files referenced by the given table-metadata.
+   *
+   * <p>In case "container" files, like the metadata, manifest-list or 
manifest files, are not

Review Comment:
   When would these files not be readable? Did you have cases in mind when that 
would happen?



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.OptionalDouble;
+import java.util.function.Consumer;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+@SuppressWarnings("unused")
+@PolarisImmutable
+public interface PurgeSpec {
+  PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build();
+
+  @Value.Default
+  default FileFilter fileFilter() {
+    return FileFilter.alwaysTrue();
+  }
+
+  PurgeSpec withFileFilter(FileFilter fileFilter);
+
+  /**
+   * Delete batch size for purge/batch-deletion operations. Implementations 
may opt to ignore this
+   * parameter and enforce a reasonable or required different limit.
+   */
+  @Value.Default
+  default int deleteBatchSize() {
+    return 250;

Review Comment:
   For my understanding, what was your thought process when choosing 250?



##########
storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.impl;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.Streams;
+import com.google.common.util.concurrent.RateLimiter;
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewMetadataParser;
+import org.apache.polaris.storage.files.api.FileFilter;
+import org.apache.polaris.storage.files.api.FileOperations;
+import org.apache.polaris.storage.files.api.FileSpec;
+import org.apache.polaris.storage.files.api.FileType;
+import org.apache.polaris.storage.files.api.ImmutablePurgeStats;
+import org.apache.polaris.storage.files.api.PurgeSpec;
+import org.apache.polaris.storage.files.api.PurgeStats;
+import org.projectnessie.storage.uri.StorageUri;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param fileIO the {@link FileIO} instance to use. The given instance must 
implement both {@link
+ *     org.apache.iceberg.io.SupportsBulkOperations} and {@link
+ *     org.apache.iceberg.io.SupportsPrefixOperations}.
+ */
+record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileOperationsImpl.class);
+
+  @Override
+  public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull 
FileFilter filter) {
+    var prefixUri = StorageUri.of(prefix).resolve("/");
+    if (fileIO instanceof SupportsPrefixOperations prefixOps) {
+      return Streams.stream(prefixOps.listPrefix(prefix).iterator())
+          .filter(Objects::nonNull)
+          .map(
+              fileInfo -> {
+                var location = StorageUri.of(fileInfo.location());
+                if (!location.isAbsolute()) {
+                  // ADLSFileIO does _not_ include the prefix, but GCSFileIO 
and S3FileIO do.
+                  location = prefixUri.resolve(location);
+                }
+                return FileSpec.builder()
+                    .location(location.toString())
+                    .size(fileInfo.size())
+                    .createdAtMillis(fileInfo.createdAtMillis())
+                    .build();
+              })
+          .filter(filter);
+    }
+
+    throw new IllegalStateException(
+        format(
+            "An Iceberg FileIO supporting prefix operations is required, but 
the given %s does not",
+            fileIO.getClass().getName()));
+  }
+
+  @Override
+  public Stream<FileSpec> identifyIcebergTableFiles(
+      @Nonnull String tableMetadataLocation, boolean deduplicate) {
+    var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation);
+    if (metadataOpt.isEmpty()) {
+      return Stream.empty();
+    }
+    var metadata = metadataOpt.get();
+
+    var metadataFileSpec =
+        
FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build();
+
+    var metadataFiles = Stream.of(metadataFileSpec);
+
+    var statisticsFiles = metadata.statisticsFiles();
+    if (statisticsFiles != null) {
+      var statisticsFileSpecs =
+          statisticsFiles.stream()
+              .map(
+                  statisticsFile ->
+                      FileSpec.fromLocationAndSize(
+                              statisticsFile.path(), 
statisticsFile.fileSizeInBytes())
+                          .fileType(FileType.ICEBERG_STATISTICS)
+                          .build());
+      metadataFiles = Stream.concat(statisticsFileSpecs, metadataFiles);
+    }
+
+    var previousFiles = metadata.previousFiles();
+    if (previousFiles != null) {
+      metadataFiles =
+          Stream.concat(
+              metadataFiles,
+              previousFiles.stream()
+                  .filter(
+                      metadataLogEntry ->
+                          metadataLogEntry.file() != null && 
!metadataLogEntry.file().isEmpty())
+                  .map(
+                      metadataLogEntry ->
+                          FileSpec.fromLocation(metadataLogEntry.file())
+                              .fileType(FileType.ICEBERG_METADATA)
+                              .build()));
+    }
+
+    var specsById = metadata.specsById();
+
+    var addPredicate = deduplicator(deduplicate);
+
+    var manifestsAndDataFiles =
+        metadata.snapshots().stream()
+            // Newest snapshots first
+            .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), 
s1.timestampMillis()))
+            .flatMap(
+                snapshot -> identifyIcebergTableSnapshotFiles(snapshot, 
specsById, addPredicate));
+
+    // Return "dependencies" before the "metadata" itself, so the probability 
of being able to
+    // resume a failed/aborted purge is higher.
+    return Stream.concat(manifestsAndDataFiles, metadataFiles);
+  }
+
+  static Predicate<String> deduplicator(boolean deduplicate) {
+    if (!deduplicate) {
+      return x -> true;
+    }
+    var set = new LinkedHashSet<String>();
+    return location -> {
+      synchronized (set) {
+        if (set.size() > 100_000) {
+          // limit the heap pressure of the deduplication set to 100,000 
elements
+          set.removeFirst();
+        }
+        return set.add(location);
+      }
+    };
+  }
+
+  Stream<FileSpec> identifyIcebergTableSnapshotFiles(
+      @Nonnull Snapshot snapshot,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+    var manifestListLocation = snapshot.manifestListLocation();
+    if (manifestListLocation != null && 
!addPredicate.test(manifestListLocation)) {
+      return Stream.empty();
+    }
+
+    return identifyIcebergManifests(manifestListLocation, snapshot, specsById, 
addPredicate);
+  }
+
+  Stream<FileSpec> identifyIcebergManifests(
+      String manifestListLocation,
+      Snapshot snapshot,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+
+    var manifestListFileSpecStream = Stream.<FileSpec>empty();
+
+    if (manifestListLocation != null && !manifestListLocation.isEmpty()) {
+      var manifestListFileSpec =
+          FileSpec.fromLocation(manifestListLocation)
+              .fileType(FileType.ICEBERG_MANIFEST_LIST)
+              .build();
+      manifestListFileSpecStream = Stream.of(manifestListFileSpec);
+    }
+
+    try {
+      var allManifestsFiles =
+          snapshot.allManifests(fileIO).stream()
+              .filter(manifestFile -> addPredicate.test(manifestFile.path()))
+              .flatMap(
+                  manifestFile ->
+                      identifyIcebergManifestDataFiles(manifestFile, 
specsById, addPredicate));
+
+      // Return "dependencies" before the "metadata" itself, so a 
failed/aborted purge can be
+      // resumed.
+      return Stream.concat(allManifestsFiles, manifestListFileSpecStream);
+    } catch (Exception e) {
+      LOGGER.warn("Failure reading manifest list file {}: {}", 
manifestListLocation, e.toString());
+      LOGGER.debug("Failure reading manifest list file {}", 
manifestListLocation);
+      return manifestListFileSpecStream;
+    }
+  }
+
+  @SuppressWarnings("UnnecessaryDefault")
+  private Stream<FileSpec> identifyIcebergManifestDataFiles(
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+
+    var manifestFileSpec =
+        FileSpec.fromLocationAndSize(manifestFile.path(), 
manifestFile.length())
+            .fileType(FileType.ICEBERG_MANIFEST_FILE)
+            .build();
+
+    try (var contentFilesIter =
+        switch (manifestFile.content()) {
+          case DATA -> ManifestFiles.read(manifestFile, fileIO).iterator();
+          case DELETES ->
+              ManifestFiles.readDeleteManifest(manifestFile, fileIO, 
specsById).iterator();
+          default -> {
+            LOGGER.warn(
+                "Unsupported content type {} in manifest {}",
+                manifestFile.content(),
+                manifestFile.path());
+            yield CloseableIterator.<ContentFile<? extends 
ContentFile<?>>>empty();
+          }
+        }) {
+
+      // Cannot leverage streaming here and eagerly build a list, as the 
manifest-file reader needs
+      // to be closed.
+      var files = new ArrayList<FileSpec>();
+      while (contentFilesIter.hasNext()) {
+        var contentFile = contentFilesIter.next();
+        if (addPredicate.test(contentFile.location())) {
+          files.add(
+              FileSpec.fromLocationAndSize(contentFile.location(), 
contentFile.fileSizeInBytes())
+                  .fileType(FileType.fromContentFile(contentFile))
+                  .build());
+        }
+      }
+      // Return "dependencies" before the "metadata" itself, so the 
probability of being able to
+      // resume a failed/aborted purge is higher.
+      files.add(manifestFileSpec);
+
+      return files.stream();
+    } catch (IOException e) {
+      LOGGER.warn("Failure reading manifest file {}: {}", manifestFile.path(), 
e.toString());
+      LOGGER.debug("Failure reading manifest file {}", manifestFile.path(), e);
+      return Stream.of(manifestFileSpec);
+    }
+  }
+
+  @Override
+  public Stream<FileSpec> identifyIcebergViewFiles(
+      @Nonnull String viewMetadataLocation, boolean deduplicate) {
+    var metadataOpt = readViewMetadataFailsafe(viewMetadataLocation);
+    if (metadataOpt.isEmpty()) {
+      return Stream.empty();
+    }
+
+    var metadataFileSpec =
+        
FileSpec.fromLocation(viewMetadataLocation).fileType(FileType.ICEBERG_METADATA).build();
+
+    return Stream.of(metadataFileSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, 
PurgeSpec purgeSpec) {
+    var files =
+        identifyIcebergTableFiles(tableMetadataLocation, 
true).filter(purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergTableBaseLocation(
+      @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) {
+    var metadata = readTableMetadataFailsafe(tableMetadataLocation);
+    if (metadata.isEmpty()) {
+      return ImmutablePurgeStats.builder()
+          .duration(Duration.ZERO)
+          .purgedFiles(0L)
+          .failedPurges(1)
+          .build();
+    }
+
+    var baseLocation = metadata.get().location();
+    var files = findFiles(baseLocation, purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, 
PurgeSpec purgeSpec) {
+    var files =
+        identifyIcebergViewFiles(viewMetadataLocation, 
false).filter(purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergViewBaseLocation(
+      @Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) {
+    var metadata = readViewMetadataFailsafe(viewMetadataLocation);
+    if (metadata.isEmpty()) {
+      return ImmutablePurgeStats.builder()
+          .duration(Duration.ZERO)
+          .purgedFiles(0L)
+          .failedPurges(1)
+          .build();
+    }
+
+    var baseLocation = metadata.get().location();
+    var files = findFiles(baseLocation, purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purge(@Nonnull Stream<FileSpec> locationStream, PurgeSpec 
purgeSpec) {
+    return purgeFiles(locationStream.map(FileSpec::location), purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeFiles(@Nonnull Stream<String> locationStream, 
PurgeSpec purgeSpec) {
+    if (fileIO instanceof SupportsBulkOperations bulkOps) {
+      var startedNanos = System.nanoTime();
+
+      var iter = locationStream.iterator();
+
+      var batcher = new PurgeBatcher(purgeSpec, bulkOps);
+      while (iter.hasNext()) {
+        batcher.add(iter.next());
+      }
+      batcher.flush();
+
+      return ImmutablePurgeStats.builder()
+          .purgedFiles(batcher.purged)
+          .failedPurges(batcher.failed)
+          .duration(Duration.ofNanos(System.nanoTime() - startedNanos))
+          .build();
+    }
+
+    throw new IllegalStateException(
+        format(
+            "An Iceberg FileIO supporting bulk operations is required, but the 
given %s does not",
+            fileIO.getClass().getName()));
+  }
+
+  @SuppressWarnings("UnstableApiUsage")
+  static final class PurgeBatcher {
+    private final PurgeSpec purgeSpec;
+    private final SupportsBulkOperations bulkOps;
+
+    private final int deleteBatchSize;
+    // Using a `Set` prevents duplicate paths in a single bulk-deletion.
+
+    private final Set<String> batch = new HashSet<>();
+
+    private final Runnable fileDeleteRateLimiter;
+    private final Runnable batchDeleteRateLimiter;
+
+    long purged = 0L;
+
+    long failed = 0L;
+
+    PurgeBatcher(PurgeSpec purgeSpec, SupportsBulkOperations bulkOps) {
+      var implSpecificLimit = implSpecificDeleteBatchLimit(bulkOps);
+
+      this.deleteBatchSize = Math.min(implSpecificLimit, 
Math.max(purgeSpec.deleteBatchSize(), 1));
+
+      this.purgeSpec = purgeSpec;
+      this.bulkOps = bulkOps;
+
+      fileDeleteRateLimiter = createLimiter(purgeSpec.fileDeletesPerSecond());
+      batchDeleteRateLimiter = 
createLimiter(purgeSpec.batchDeletesPerSecond());
+    }
+
+    private static Runnable createLimiter(OptionalDouble optionalDouble) {
+      if (optionalDouble.isEmpty()) {
+        // unlimited
+        return () -> {};
+      }
+      var limiter = RateLimiter.create(optionalDouble.getAsDouble());
+      return limiter::acquire;
+    }
+
+    void add(String location) {
+      fileDeleteRateLimiter.run();
+      batch.add(location);
+
+      if (batch.size() >= deleteBatchSize) {
+        flush();
+      }
+    }
+
+    void flush() {
+      int size = batch.size();
+      if (size > 0) {
+        batch.forEach(purgeSpec.purgeIssuedCallback());
+        try {
+          batchDeleteRateLimiter.run();
+          bulkOps.deleteFiles(batch);
+          purged += size;
+        } catch (BulkDeletionFailureException e) {
+          // Object stores do delete the files that exist, but a 
BulkDeletionFailureException is
+          // still being thrown.
+          // However, not all FileIO implementations behave the same way as 
some don't throw in the
+          // non-existent-case.
+          var batchFailed = e.numberFailedObjects();
+          purged += size - batchFailed;
+          failed += batchFailed;
+        } finally {
+          batch.clear();
+        }
+      }
+    }
+  }
+
+  /** Figure out the hard coded max batch size limit for a particular FileIO 
implementation. */
+  static int implSpecificDeleteBatchLimit(SupportsBulkOperations bulkOps) {
+    var className = bulkOps.getClass().getName();

Review Comment:
   Does this work? Like, I think we are trying use `getClass().getSimpleName()` 
rather than `getClass().getName() `.



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import jakarta.annotation.Nonnull;
+import java.util.stream.Stream;
+
+/**
+ * Object storage file operations, used to find files below a given prefix, to 
purge files, to
+ * identify referenced files, etc.
+ *
+ * <p>All functions of this interface rather yield incomplete results and 
continue over throwing
+ * exceptions.
+ */
+public interface FileOperations {

Review Comment:
   Nit: For readability, I would introduce a FilePurgeOperations interface 
which could contain all of the purge operations and have this interface extend 
that one.



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileSpec.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/**
+ * Describes a single file/object in an object storage.
+ *
+ * <p>Not all attributes are populated by every {@link FileOperations} 
function.
+ */
+@PolarisImmutable
+public interface FileSpec {
+
+  /** The full object storage URI. */
+  String location();
+
+  /**
+   * The type of the file, if known.
+   *
+   * @see #guessTypeFromName()
+   */
+  Optional<FileType> fileType();
+
+  /** The size of the file in bytes, if available. */
+  OptionalLong size();
+
+  /** The creation timestamp in milliseconds since the epoch, if available. */
+  OptionalLong createdAtMillis();
+
+  static Builder builder() {
+    return ImmutableFileSpec.builder();
+  }
+
+  static Builder fromLocation(String location) {
+    return builder().location(location);
+  }
+
+  static Builder fromLocationAndSize(String location, long size) {
+    var b = fromLocation(location);
+    if (size > 0L) {
+      b.size(size);
+    }
+    return b;
+  }
+
+  default FileType guessTypeFromName() {
+    var location = location();
+    var lastSlash = location.lastIndexOf('/');
+    var fileName = lastSlash > 0 ? location.substring(lastSlash + 1) : 
location;
+
+    if (fileName.contains(".metadata.json")) {
+      return FileType.ICEBERG_METADATA;
+    } else if (fileName.startsWith("snap-")) {
+      return FileType.ICEBERG_MANIFEST_LIST;
+    } else if (fileName.contains("-m")) {

Review Comment:
   I'm wondering if this could lead to false positives. Like, could we 
accidentally purge a file with this name.



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.OptionalDouble;
+import java.util.function.Consumer;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+@SuppressWarnings("unused")
+@PolarisImmutable
+public interface PurgeSpec {
+  PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build();
+
+  @Value.Default
+  default FileFilter fileFilter() {
+    return FileFilter.alwaysTrue();
+  }
+
+  PurgeSpec withFileFilter(FileFilter fileFilter);
+
+  /**
+   * Delete batch size for purge/batch-deletion operations. Implementations 
may opt to ignore this
+   * parameter and enforce a reasonable or required different limit.
+   */
+  @Value.Default
+  default int deleteBatchSize() {
+    return 250;
+  }
+
+  PurgeSpec withDeleteBatchSize(int deleteBatchSize);
+
+  /**
+   * Callback being invoked right before a file location is being submitted to 
be purged.
+   *
+   * <p>Due to API constraints of {@link
+   * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable)} it's 
barely possible to

Review Comment:
   Nit: I'd rephrase to "This callback allows users to identify files that 
failed a deletion."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to