aokolnychyi commented on a change in pull request #1211:
URL: https://github.com/apache/iceberg/pull/1211#discussion_r457527925



##########
File path: core/src/main/java/org/apache/iceberg/ManifestFiles.java
##########
@@ -138,8 +138,8 @@ private ManifestFiles() {
     return open(manifest, io, null);
   }
 
-  static ManifestReader<?> open(ManifestFile manifest, FileIO io,
-                                Map<Integer, PartitionSpec> specsById) {
+  public static ManifestReader<?> open(

Review comment:
       nit: this can actually fit on one line

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotResults.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+public class ExpireSnapshotResults {

Review comment:
       Let's call it `ExpireSnapshotsActionResult` to be consistent with other 
actions.

##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -51,6 +51,7 @@
 import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 
 class RemoveSnapshots implements ExpireSnapshots {
+

Review comment:
       nit: cosmetic change

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotUtil.class);
+
+  private static Set<Long> getPickedAncestorIds(TableMetadata currentMetadata, 
Set<Long> ancestorIds) {
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+
+    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
+    for (long snapshotId : ancestorIds) {
+      String sourceSnapshotId = currentMetadata.snapshot(snapshotId).summary()
+          .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP);
+      if (sourceSnapshotId != null) {
+        // protect any snapshot that was cherry-picked into the current table 
state
+        pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId));
+      }
+    }
+
+    return pickedAncestorSnapshotIds;
+  }
+
+  /**
+   * Given a list of currently valid snapshots, extract all the manifests from 
those snapshots. If
+   * there is an error while reading manifest lists an incomplete list of 
manifests will be
+   * produced.
+   *
+   * @param currentSnapshots a list of currently valid non-expired snapshots
+   * @return all of the manifests of those snapshots
+   */
+  private static Set<ManifestFile> getValidManifests(List<Snapshot> 
currentSnapshots, FileIO io) {
+
+    Set<ManifestFile> validManifests = Sets.newHashSet();
+    Tasks.foreach(currentSnapshots).retry(3).suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) ->
+            LOG.warn("Failed on snapshot {} while reading manifest list: {}", 
snapshot.snapshotId(),
+                snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot, io)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest);
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(
+                    String.format("Failed to close manifest list: %s",

Review comment:
       nit: the args can be on one line

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotUtil.class);
+
+  private static Set<Long> getPickedAncestorIds(TableMetadata currentMetadata, 
Set<Long> ancestorIds) {
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+
+    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
+    for (long snapshotId : ancestorIds) {
+      String sourceSnapshotId = currentMetadata.snapshot(snapshotId).summary()
+          .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP);
+      if (sourceSnapshotId != null) {
+        // protect any snapshot that was cherry-picked into the current table 
state
+        pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId));
+      }
+    }
+
+    return pickedAncestorSnapshotIds;
+  }
+
+  /**
+   * Given a list of currently valid snapshots, extract all the manifests from 
those snapshots. If
+   * there is an error while reading manifest lists an incomplete list of 
manifests will be
+   * produced.
+   *
+   * @param currentSnapshots a list of currently valid non-expired snapshots
+   * @return all of the manifests of those snapshots
+   */
+  private static Set<ManifestFile> getValidManifests(List<Snapshot> 
currentSnapshots, FileIO io) {
+
+    Set<ManifestFile> validManifests = Sets.newHashSet();
+    Tasks.foreach(currentSnapshots).retry(3).suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) ->
+            LOG.warn("Failed on snapshot {} while reading manifest list: {}", 
snapshot.snapshotId(),
+                snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot, io)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest);
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(
+                    String.format("Failed to close manifest list: %s",
+                        snapshot.manifestListLocation()),
+                    e);
+              }
+            });
+    return validManifests;
+  }
+
+  /**
+   * Find manifests to clean up that are still referenced by a valid snapshot, 
but written by an
+   * expired snapshot.
+   *
+   * @param validSnapshotIds     A list of the snapshots which are not expired
+   * @param originalMeta A reference to the table before expiration
+   * @return MetadataFiles which must be scanned to look for files to delete
+   */
+  private static Set<ManifestFile> validManifestsInExpiredSnapshots(
+      Set<ManifestFile> validManfiests, TableMetadata originalMeta, Set<Long> 
validSnapshotIds) {
+
+    Set<Long> ancestorIds = 
SnapshotUtil.ancestorIds(originalMeta.currentSnapshot(), originalMeta::snapshot)
+        .stream().collect(Collectors.toSet());
+    Set<Long> pickedAncestorSnapshotIds = getPickedAncestorIds(originalMeta, 
ancestorIds);
+
+    Set<ManifestFile> manifestsToScan = Sets.newHashSet();
+    validManfiests.forEach(manifest -> {
+      long snapshotId = manifest.snapshotId();
+      // whether the manifest was created by a valid snapshot (true) or an 
expired snapshot (false)
+      boolean fromValidSnapshots = validSnapshotIds.contains(snapshotId);
+      // whether the snapshot that created the manifest was an ancestor of the 
table state
+      boolean isFromAncestor = ancestorIds.contains(snapshotId);
+      // whether the changes in this snapshot have been picked into the 
current table state
+      boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId);
+      // if the snapshot that wrote this manifest is no longer valid (has 
expired),
+      // then delete its deleted files. note that this is only for expired 
snapshots that are in the
+      // current table state
+      if (!fromValidSnapshots && (isFromAncestor || isPicked) && 
manifest.hasDeletedFiles()) {
+        manifestsToScan.add(manifest.copy());
+      }
+    });
+    return manifestsToScan;
+  }
+
+  /**
+   * Removes snapshots whose changes impact the current table state leaving 
only those which may
+   * have files that could potentially need to be deleted.
+   *
+   * @param originalMeta TableMetadata for the table we are expiring from
+   * @param validSnapshotIds Snapshots which are not expired
+   * @return A list of those snapshots which may have files that need to be 
deleted
+   */
+  private static List<Snapshot> snapshotsNotInTableState(Set<Long> 
validSnapshotIds, TableMetadata originalMeta) {
+
+    Set<Long> ancestorIds = 
SnapshotUtil.ancestorIds(originalMeta.currentSnapshot(), originalMeta::snapshot)

Review comment:
       Can we avoid computing ancestor ids twice?

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;
+  private SparkSession session;

Review comment:
       We tend to put Spark-related vars at the beginning of the list, make 
them final and call session as `spark`. For example, we have the following in 
`RewriteManifestsAction`:
   
   ```
     private final SparkSession spark;
     private final JavaSparkContext sparkContext;
   ```
   
   And init that in the constructor:
   
   ```
       this.spark = spark;
       this.sparkContext = new JavaSparkContext(spark.sparkContext());
   ```

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;
+  private SparkSession session;
+
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+  private Consumer<String> deleteFunc = defaultDelete;
+
+
+  ExpireSnapshotsAction(SparkSession session, Table table) {
+    this.session = session;
+    this.table = table;
+    this.ops = ((HasTableOperations) table).operations();
+    this.base = ops.current();
+    this.localExpireSnapshots = 
table.expireSnapshots().deleteExpiredFiles(false);
+  }
+
+  public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
+    localExpireSnapshots.expireSnapshotId(expireSnapshotId);
+    return this;
+  }
+
+  public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
+    localExpireSnapshots.expireOlderThan(timestampMillis);
+    return this;
+  }
+
+  public ExpireSnapshotsAction retainLast(int numSnapshots) {
+    localExpireSnapshots.retainLast(numSnapshots);
+    return this;
+  }
+
+  public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
+    deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * Execute is a synonym for commit in this implementation. Calling either 
commit or execute will
+   * launch the Spark equivalent of RemoveSnapshots.
+   *
+   * @return nothing
+   */
+  @Override
+  public ExpireSnapshotResults execute() {
+    localExpireSnapshots.commit();
+
+    TableMetadata currentMetadata = ops.refresh();
+
+    //Locally determine which snapshots have been expired
+    SnapshotExpirationChanges expiredSnapshotChanges =
+        ExpireSnapshotUtil.getExpiredSnapshots(currentMetadata, base);
+
+    //Locally determine which manifests will need to be scanned, reverted, 
deleted
+    ManifestExpirationChanges manifestExpirationChanges =
+        ExpireSnapshotUtil.determineManifestChangesFromSnapshotExpiration(
+            expiredSnapshotChanges.validSnapshotIds(), 
expiredSnapshotChanges.expiredSnapshotIds(),
+            currentMetadata, base, ops.io());
+
+    FileIO io = SparkUtil.serializableFileIO(table);
+
+    //Going the RDD Route because our reader functions all work with full 
Manifest Files
+    JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(session.sparkContext());
+
+    JavaRDD<ManifestFile> manifestsToScan =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToScan()));
+
+    JavaRDD<ManifestFile> manifestsToRevert =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToRevert()));
+
+    FileIO serializableIO = SparkUtil.serializableFileIO(table);
+
+    Broadcast<Map<Integer, PartitionSpec>> broadcastedSpecLookup =

Review comment:
       I think this broadcast optional as there may be a couple of specs at 
most.

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return

Review comment:
       nit: empty return statement

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return

Review comment:
       nit: empty return statement

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}

Review comment:
       I'd put the static variable and the constructor at the beginning of the 
class and then public static methods.

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);

Review comment:
       I cannot comment on the 
[old](https://github.com/apache/iceberg/pull/1211#discussion_r456635324) thread 
so I'll do it here.
   
   What I meant is simply taking this line out and logging a set of expired 
snapshots in the caller. 

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots

Review comment:
       nit: `Metada`

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotUtil.class);
+
+  private static Set<Long> getPickedAncestorIds(TableMetadata currentMetadata, 
Set<Long> ancestorIds) {
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+
+    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
+    for (long snapshotId : ancestorIds) {
+      String sourceSnapshotId = currentMetadata.snapshot(snapshotId).summary()
+          .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP);
+      if (sourceSnapshotId != null) {
+        // protect any snapshot that was cherry-picked into the current table 
state
+        pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId));
+      }
+    }
+
+    return pickedAncestorSnapshotIds;
+  }
+
+  /**
+   * Given a list of currently valid snapshots, extract all the manifests from 
those snapshots. If
+   * there is an error while reading manifest lists an incomplete list of 
manifests will be
+   * produced.
+   *
+   * @param currentSnapshots a list of currently valid non-expired snapshots
+   * @return all of the manifests of those snapshots
+   */
+  private static Set<ManifestFile> getValidManifests(List<Snapshot> 
currentSnapshots, FileIO io) {
+
+    Set<ManifestFile> validManifests = Sets.newHashSet();
+    Tasks.foreach(currentSnapshots).retry(3).suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) ->
+            LOG.warn("Failed on snapshot {} while reading manifest list: {}", 
snapshot.snapshotId(),
+                snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot, io)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest);
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(
+                    String.format("Failed to close manifest list: %s",
+                        snapshot.manifestListLocation()),
+                    e);
+              }
+            });
+    return validManifests;
+  }
+
+  /**
+   * Find manifests to clean up that are still referenced by a valid snapshot, 
but written by an
+   * expired snapshot.
+   *
+   * @param validSnapshotIds     A list of the snapshots which are not expired
+   * @param originalMeta A reference to the table before expiration
+   * @return MetadataFiles which must be scanned to look for files to delete
+   */
+  private static Set<ManifestFile> validManifestsInExpiredSnapshots(
+      Set<ManifestFile> validManfiests, TableMetadata originalMeta, Set<Long> 
validSnapshotIds) {
+
+    Set<Long> ancestorIds = 
SnapshotUtil.ancestorIds(originalMeta.currentSnapshot(), originalMeta::snapshot)

Review comment:
       Naming `originalMetadata` as `base` and `currentMetadata` as `current` 
may potentially reduce the length of lines and make code shorter in the utility 
class. We use it in a couple of places so it makes sense exploring, but not 
required.
   
   Then these lines can be replaced with one:
   
   ```
   Set<Long> ancestorIds = 
Sets.newHashSet(SnapshotUtil.ancestorIds(base.currentSnapshot(), 
base::snapshot));
   ```

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotUtil.class);
+
+  private static Set<Long> getPickedAncestorIds(TableMetadata currentMetadata, 
Set<Long> ancestorIds) {
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+
+    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
+    for (long snapshotId : ancestorIds) {
+      String sourceSnapshotId = currentMetadata.snapshot(snapshotId).summary()
+          .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP);
+      if (sourceSnapshotId != null) {
+        // protect any snapshot that was cherry-picked into the current table 
state
+        pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId));
+      }
+    }
+
+    return pickedAncestorSnapshotIds;
+  }
+
+  /**
+   * Given a list of currently valid snapshots, extract all the manifests from 
those snapshots. If
+   * there is an error while reading manifest lists an incomplete list of 
manifests will be
+   * produced.
+   *
+   * @param currentSnapshots a list of currently valid non-expired snapshots
+   * @return all of the manifests of those snapshots
+   */
+  private static Set<ManifestFile> getValidManifests(List<Snapshot> 
currentSnapshots, FileIO io) {
+
+    Set<ManifestFile> validManifests = Sets.newHashSet();
+    Tasks.foreach(currentSnapshots).retry(3).suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) ->
+            LOG.warn("Failed on snapshot {} while reading manifest list: {}", 
snapshot.snapshotId(),
+                snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot, io)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest);
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(
+                    String.format("Failed to close manifest list: %s",
+                        snapshot.manifestListLocation()),
+                    e);
+              }
+            });
+    return validManifests;
+  }
+
+  /**
+   * Find manifests to clean up that are still referenced by a valid snapshot, 
but written by an
+   * expired snapshot.
+   *
+   * @param validSnapshotIds     A list of the snapshots which are not expired
+   * @param originalMeta A reference to the table before expiration
+   * @return MetadataFiles which must be scanned to look for files to delete
+   */
+  private static Set<ManifestFile> validManifestsInExpiredSnapshots(
+      Set<ManifestFile> validManfiests, TableMetadata originalMeta, Set<Long> 
validSnapshotIds) {

Review comment:
       nit: typo in `validManfiests`

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;
+  private SparkSession session;
+
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+  private Consumer<String> deleteFunc = defaultDelete;
+
+
+  ExpireSnapshotsAction(SparkSession session, Table table) {
+    this.session = session;
+    this.table = table;
+    this.ops = ((HasTableOperations) table).operations();
+    this.base = ops.current();
+    this.localExpireSnapshots = 
table.expireSnapshots().deleteExpiredFiles(false);
+  }
+
+  public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
+    localExpireSnapshots.expireSnapshotId(expireSnapshotId);
+    return this;
+  }
+
+  public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
+    localExpireSnapshots.expireOlderThan(timestampMillis);
+    return this;
+  }
+
+  public ExpireSnapshotsAction retainLast(int numSnapshots) {
+    localExpireSnapshots.retainLast(numSnapshots);
+    return this;
+  }
+
+  public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
+    deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * Execute is a synonym for commit in this implementation. Calling either 
commit or execute will
+   * launch the Spark equivalent of RemoveSnapshots.
+   *
+   * @return nothing
+   */
+  @Override
+  public ExpireSnapshotResults execute() {
+    localExpireSnapshots.commit();
+
+    TableMetadata currentMetadata = ops.refresh();
+
+    //Locally determine which snapshots have been expired
+    SnapshotExpirationChanges expiredSnapshotChanges =
+        ExpireSnapshotUtil.getExpiredSnapshots(currentMetadata, base);
+
+    //Locally determine which manifests will need to be scanned, reverted, 
deleted
+    ManifestExpirationChanges manifestExpirationChanges =
+        ExpireSnapshotUtil.determineManifestChangesFromSnapshotExpiration(
+            expiredSnapshotChanges.validSnapshotIds(), 
expiredSnapshotChanges.expiredSnapshotIds(),
+            currentMetadata, base, ops.io());
+
+    FileIO io = SparkUtil.serializableFileIO(table);
+
+    //Going the RDD Route because our reader functions all work with full 
Manifest Files
+    JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(session.sparkContext());
+
+    JavaRDD<ManifestFile> manifestsToScan =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToScan()));
+
+    JavaRDD<ManifestFile> manifestsToRevert =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToRevert()));
+
+    FileIO serializableIO = SparkUtil.serializableFileIO(table);

Review comment:
       It is important to broadcast `FileIO` as it is the most expensive part.

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(

Review comment:
       nit: If we renamed args to `current` and `base`, this would also fit on 
one line.
   
   ```
   ... (TableMetadata current, TableMetadata base) {
   
   }
   ```

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;

Review comment:
       We need to verify that saving the base version in the constructor is 
safe and doesn't cause any issues if there is a concurrent operation after we 
checked the base and before we committed.

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotUtil.class);
+
+  private static Set<Long> getPickedAncestorIds(TableMetadata currentMetadata, 
Set<Long> ancestorIds) {
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+
+    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
+    for (long snapshotId : ancestorIds) {
+      String sourceSnapshotId = currentMetadata.snapshot(snapshotId).summary()
+          .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP);
+      if (sourceSnapshotId != null) {
+        // protect any snapshot that was cherry-picked into the current table 
state
+        pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId));
+      }
+    }
+
+    return pickedAncestorSnapshotIds;
+  }
+
+  /**
+   * Given a list of currently valid snapshots, extract all the manifests from 
those snapshots. If
+   * there is an error while reading manifest lists an incomplete list of 
manifests will be
+   * produced.
+   *
+   * @param currentSnapshots a list of currently valid non-expired snapshots
+   * @return all of the manifests of those snapshots
+   */
+  private static Set<ManifestFile> getValidManifests(List<Snapshot> 
currentSnapshots, FileIO io) {
+
+    Set<ManifestFile> validManifests = Sets.newHashSet();
+    Tasks.foreach(currentSnapshots).retry(3).suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) ->
+            LOG.warn("Failed on snapshot {} while reading manifest list: {}", 
snapshot.snapshotId(),
+                snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot, io)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest);
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(
+                    String.format("Failed to close manifest list: %s",
+                        snapshot.manifestListLocation()),
+                    e);
+              }
+            });
+    return validManifests;
+  }
+
+  /**
+   * Find manifests to clean up that are still referenced by a valid snapshot, 
but written by an
+   * expired snapshot.
+   *
+   * @param validSnapshotIds     A list of the snapshots which are not expired
+   * @param originalMeta A reference to the table before expiration
+   * @return MetadataFiles which must be scanned to look for files to delete
+   */
+  private static Set<ManifestFile> validManifestsInExpiredSnapshots(
+      Set<ManifestFile> validManfiests, TableMetadata originalMeta, Set<Long> 
validSnapshotIds) {
+
+    Set<Long> ancestorIds = 
SnapshotUtil.ancestorIds(originalMeta.currentSnapshot(), originalMeta::snapshot)
+        .stream().collect(Collectors.toSet());
+    Set<Long> pickedAncestorSnapshotIds = getPickedAncestorIds(originalMeta, 
ancestorIds);
+
+    Set<ManifestFile> manifestsToScan = Sets.newHashSet();
+    validManfiests.forEach(manifest -> {
+      long snapshotId = manifest.snapshotId();
+      // whether the manifest was created by a valid snapshot (true) or an 
expired snapshot (false)
+      boolean fromValidSnapshots = validSnapshotIds.contains(snapshotId);
+      // whether the snapshot that created the manifest was an ancestor of the 
table state
+      boolean isFromAncestor = ancestorIds.contains(snapshotId);
+      // whether the changes in this snapshot have been picked into the 
current table state
+      boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId);
+      // if the snapshot that wrote this manifest is no longer valid (has 
expired),
+      // then delete its deleted files. note that this is only for expired 
snapshots that are in the
+      // current table state
+      if (!fromValidSnapshots && (isFromAncestor || isPicked) && 
manifest.hasDeletedFiles()) {
+        manifestsToScan.add(manifest.copy());
+      }
+    });
+    return manifestsToScan;
+  }
+
+  /**
+   * Removes snapshots whose changes impact the current table state leaving 
only those which may
+   * have files that could potentially need to be deleted.
+   *
+   * @param originalMeta TableMetadata for the table we are expiring from
+   * @param validSnapshotIds Snapshots which are not expired
+   * @return A list of those snapshots which may have files that need to be 
deleted
+   */
+  private static List<Snapshot> snapshotsNotInTableState(Set<Long> 
validSnapshotIds, TableMetadata originalMeta) {

Review comment:
       What about adding a verb to the name here too?

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotUtil.class);
+
+  private static Set<Long> getPickedAncestorIds(TableMetadata currentMetadata, 
Set<Long> ancestorIds) {
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+
+    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
+    for (long snapshotId : ancestorIds) {
+      String sourceSnapshotId = currentMetadata.snapshot(snapshotId).summary()
+          .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP);
+      if (sourceSnapshotId != null) {
+        // protect any snapshot that was cherry-picked into the current table 
state
+        pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId));
+      }
+    }
+
+    return pickedAncestorSnapshotIds;
+  }
+
+  /**
+   * Given a list of currently valid snapshots, extract all the manifests from 
those snapshots. If
+   * there is an error while reading manifest lists an incomplete list of 
manifests will be
+   * produced.
+   *
+   * @param currentSnapshots a list of currently valid non-expired snapshots
+   * @return all of the manifests of those snapshots
+   */
+  private static Set<ManifestFile> getValidManifests(List<Snapshot> 
currentSnapshots, FileIO io) {
+
+    Set<ManifestFile> validManifests = Sets.newHashSet();
+    Tasks.foreach(currentSnapshots).retry(3).suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) ->
+            LOG.warn("Failed on snapshot {} while reading manifest list: {}", 
snapshot.snapshotId(),
+                snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot, io)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest);
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(
+                    String.format("Failed to close manifest list: %s",
+                        snapshot.manifestListLocation()),
+                    e);
+              }
+            });
+    return validManifests;
+  }
+
+  /**
+   * Find manifests to clean up that are still referenced by a valid snapshot, 
but written by an
+   * expired snapshot.
+   *
+   * @param validSnapshotIds     A list of the snapshots which are not expired
+   * @param originalMeta A reference to the table before expiration
+   * @return MetadataFiles which must be scanned to look for files to delete
+   */
+  private static Set<ManifestFile> validManifestsInExpiredSnapshots(

Review comment:
       Will it make sense to add a verb to the method name? Like 
`findValidManifests...`?

##########
File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExpireSnapshotUtil {
+
+  /**
+   * Determines the manifest files which need to be inspected because they 
refer to data files which
+   * can be removed after a Snapshot Expiration.
+   *
+   * Our goal is to determine which manifest files we actually need to read 
through because they
+   * may refer to files which are no longer accessible from any valid snapshot 
and do not effect
+   * the current table.
+   *
+   * For this we need to look through
+   *   1. Snapshots which have not expired but contain manifests from expired 
snapshots
+   *   2. Snapshots which have expired and contain manifests referring to now 
orphaned files
+   *
+   * @param validIds              The Ids of the Snapshots which have not been 
expired
+   * @param expiredIds            The Ids of the Snapshots which have been 
expired
+   * @param currentMetadata       The table metadata from after the snapshot 
expiration
+   * @param originalMetadata      The table metadata from before the snapshot 
expiration
+   * @param io                    FileIO for reading manifest info
+   * @return
+   */
+  public static ManifestExpirationChanges 
determineManifestChangesFromSnapshotExpiration(Set<Long> validIds,
+      Set<Long> expiredIds, TableMetadata currentMetadata, TableMetadata 
originalMetadata, FileIO io) {
+
+    List<Snapshot> currentSnapshots = currentMetadata.snapshots();
+
+    //Snapshots which are not expired but refer to manifests from expired 
snapshots
+    Set<ManifestFile> validManifests = getValidManifests(currentSnapshots, io);
+    Set<ManifestFile> manifestsToScan = 
validManifestsInExpiredSnapshots(validManifests,
+        originalMetadata, validIds);
+
+    //Snapshots which are expired and do not effect the current table
+    List<Snapshot> snapshotsNotChangingTableState = 
snapshotsNotInTableState(validIds, originalMetadata);
+    ManifestExpirationChanges manifestExpirationChanges =
+        findExpiredManifestsInUnusedSnapshots(snapshotsNotChangingTableState, 
validManifests,
+            originalMetadata, expiredIds, io);
+
+    manifestExpirationChanges.manifestsToScan().addAll(manifestsToScan);
+    return manifestExpirationChanges;
+  }
+
+  /**
+   * Compares the Snapshots from the two TableMetadata objects and identifies 
the snapshots
+   * still in use and those no longer in use
+   * @param currentMetadata Metadata from a table after an expiration of 
snapshots
+   * @param originalMetadata Metada from the table before expiration of 
snapshots
+   * @return
+   */
+  public static SnapshotExpirationChanges getExpiredSnapshots(
+      TableMetadata currentMetadata, TableMetadata originalMetadata) {
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : currentMetadata.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : originalMetadata.snapshots()) {
+      long snapshotId = snapshot.snapshotId();
+      if (!validIds.contains(snapshotId)) {
+        // This snapshot is no longer in the updated metadata
+        LOG.info("Expired snapshot: {}", snapshot);
+        expiredIds.add(snapshotId);
+      }
+    }
+
+    return new SnapshotExpirationChanges(validIds, expiredIds);
+  }
+
+  //Utility Class No Instantiation Allowed
+  private ExpireSnapshotUtil() {}
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotUtil.class);
+
+  private static Set<Long> getPickedAncestorIds(TableMetadata currentMetadata, 
Set<Long> ancestorIds) {
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+
+    Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet();
+    for (long snapshotId : ancestorIds) {
+      String sourceSnapshotId = currentMetadata.snapshot(snapshotId).summary()

Review comment:
       I think this should be `base`, not `current`. Also, if we rename 
`currentMetadata` to `base`, this would fit on one line like before.

##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -152,168 +165,23 @@ private void cleanExpiredSnapshots() {
     // 2. Delete any data files that were deleted by those snapshots and are 
not in the table
     // 3. Delete any manifests that are no longer used by current snapshots
     // 4. Delete the manifest lists
+    TableMetadata currentMetadata = ops.refresh();
+    SnapshotExpirationChanges snapshotChanges = 
ExpireSnapshotUtil.getExpiredSnapshots(currentMetadata, base);
 
-    TableMetadata current = ops.refresh();
-
-    Set<Long> validIds = Sets.newHashSet();
-    for (Snapshot snapshot : current.snapshots()) {
-      validIds.add(snapshot.snapshotId());
-    }
-
-    Set<Long> expiredIds = Sets.newHashSet();
-    for (Snapshot snapshot : base.snapshots()) {
-      long snapshotId = snapshot.snapshotId();
-      if (!validIds.contains(snapshotId)) {
-        // the snapshot was expired
-        LOG.info("Expired snapshot: {}", snapshot);
-        expiredIds.add(snapshotId);
-      }
-    }
-
-    if (expiredIds.isEmpty()) {
+    if (snapshotChanges.expiredSnapshotIds().isEmpty()) {
       // if no snapshots were expired, skip cleanup
       return;
     }
 
-    LOG.info("Committed snapshot changes; cleaning up expired manifests and 
data files.");
+    LOG.info("Cleaning up expired manifests and data files locally.");
 
-    cleanExpiredFiles(current.snapshots(), validIds, expiredIds);
-  }
-
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  private void cleanExpiredFiles(List<Snapshot> snapshots, Set<Long> validIds, 
Set<Long> expiredIds) {
     // Reads and deletes are done using 
Tasks.foreach(...).suppressFailureWhenFinished to complete

Review comment:
       Is this comment more appropriate in a different place now?

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;
+  private SparkSession session;
+
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+  private Consumer<String> deleteFunc = defaultDelete;
+
+
+  ExpireSnapshotsAction(SparkSession session, Table table) {
+    this.session = session;
+    this.table = table;
+    this.ops = ((HasTableOperations) table).operations();
+    this.base = ops.current();
+    this.localExpireSnapshots = 
table.expireSnapshots().deleteExpiredFiles(false);
+  }
+
+  public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
+    localExpireSnapshots.expireSnapshotId(expireSnapshotId);
+    return this;
+  }
+
+  public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
+    localExpireSnapshots.expireOlderThan(timestampMillis);
+    return this;
+  }
+
+  public ExpireSnapshotsAction retainLast(int numSnapshots) {
+    localExpireSnapshots.retainLast(numSnapshots);
+    return this;
+  }
+
+  public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
+    deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * Execute is a synonym for commit in this implementation. Calling either 
commit or execute will
+   * launch the Spark equivalent of RemoveSnapshots.
+   *
+   * @return nothing
+   */
+  @Override
+  public ExpireSnapshotResults execute() {
+    localExpireSnapshots.commit();
+
+    TableMetadata currentMetadata = ops.refresh();
+
+    //Locally determine which snapshots have been expired
+    SnapshotExpirationChanges expiredSnapshotChanges =
+        ExpireSnapshotUtil.getExpiredSnapshots(currentMetadata, base);
+
+    //Locally determine which manifests will need to be scanned, reverted, 
deleted
+    ManifestExpirationChanges manifestExpirationChanges =
+        ExpireSnapshotUtil.determineManifestChangesFromSnapshotExpiration(
+            expiredSnapshotChanges.validSnapshotIds(), 
expiredSnapshotChanges.expiredSnapshotIds(),
+            currentMetadata, base, ops.io());
+
+    FileIO io = SparkUtil.serializableFileIO(table);
+
+    //Going the RDD Route because our reader functions all work with full 
Manifest Files
+    JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(session.sparkContext());
+
+    JavaRDD<ManifestFile> manifestsToScan =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToScan()));
+
+    JavaRDD<ManifestFile> manifestsToRevert =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToRevert()));
+
+    FileIO serializableIO = SparkUtil.serializableFileIO(table);
+
+    Broadcast<Map<Integer, PartitionSpec>> broadcastedSpecLookup =
+        javaSparkContext.broadcast(ops.current().specsById());
+
+    Broadcast<Set<Long>> broadcastValidIDs =
+        javaSparkContext.broadcast(expiredSnapshotChanges.validSnapshotIds());
+
+    JavaRDD<String> filesToDeleteFromScan = 
manifestsToScan.mapPartitions(manifests -> {
+      Map<Integer, PartitionSpec> specLookup = 
broadcastedSpecLookup.getValue();
+      Set<Long> validIds = broadcastValidIDs.getValue();
+      Set<String> filesToDelete = new HashSet<>();
+      Tasks.foreach(ImmutableList.copyOf(manifests))
+          .retry(3).suppressFailureWhenFinished()
+          .executeWith(ThreadPools.getWorkerPool())
+          .onFailure((item, exc) -> LOG
+              .warn("Failed to get deleted files: this may cause orphaned data 
files", exc))
+          .run(manifest -> {
+            // the manifest has deletes, scan it to find files to delete
+            try (ManifestReader<?> reader = ManifestFiles
+                .open(manifest, serializableIO, specLookup)) {
+              for (ManifestEntry<?> entry : reader.entries()) {
+                // if the snapshot ID of the DELETE entry is no longer valid, 
the data can be deleted
+                if (entry.status() == ManifestEntry.Status.DELETED &&
+                    !validIds.contains(entry.snapshotId())) {
+                  // use toString to ensure the path will not change (Utf8 is 
reused)
+                  filesToDelete.add(entry.file().path().toString());
+                }
+              }
+            } catch (IOException e) {
+              throw new UncheckedIOException(
+                  String.format("Failed to read manifest file: %s", manifest), 
e);
+            }
+          });
+      return filesToDelete.iterator();
+    });
+
+    JavaRDD<String> filesToDeleteFromRevert = 
manifestsToRevert.mapPartitions(manifests -> {
+      Map<Integer, PartitionSpec> specLookup = 
broadcastedSpecLookup.getValue();
+      Set<String> filesToDelete = new HashSet<>();
+      Tasks.foreach(ImmutableList.copyOf(manifests))
+          .retry(3).suppressFailureWhenFinished()
+          .executeWith(ThreadPools.getWorkerPool())

Review comment:
       same here

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;
+  private SparkSession session;
+
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+  private Consumer<String> deleteFunc = defaultDelete;
+
+
+  ExpireSnapshotsAction(SparkSession session, Table table) {
+    this.session = session;
+    this.table = table;
+    this.ops = ((HasTableOperations) table).operations();
+    this.base = ops.current();
+    this.localExpireSnapshots = 
table.expireSnapshots().deleteExpiredFiles(false);
+  }
+
+  public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
+    localExpireSnapshots.expireSnapshotId(expireSnapshotId);
+    return this;
+  }
+
+  public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
+    localExpireSnapshots.expireOlderThan(timestampMillis);
+    return this;
+  }
+
+  public ExpireSnapshotsAction retainLast(int numSnapshots) {
+    localExpireSnapshots.retainLast(numSnapshots);
+    return this;
+  }
+
+  public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
+    deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * Execute is a synonym for commit in this implementation. Calling either 
commit or execute will
+   * launch the Spark equivalent of RemoveSnapshots.
+   *
+   * @return nothing
+   */
+  @Override
+  public ExpireSnapshotResults execute() {
+    localExpireSnapshots.commit();
+
+    TableMetadata currentMetadata = ops.refresh();
+
+    //Locally determine which snapshots have been expired
+    SnapshotExpirationChanges expiredSnapshotChanges =
+        ExpireSnapshotUtil.getExpiredSnapshots(currentMetadata, base);
+
+    //Locally determine which manifests will need to be scanned, reverted, 
deleted
+    ManifestExpirationChanges manifestExpirationChanges =
+        ExpireSnapshotUtil.determineManifestChangesFromSnapshotExpiration(
+            expiredSnapshotChanges.validSnapshotIds(), 
expiredSnapshotChanges.expiredSnapshotIds(),
+            currentMetadata, base, ops.io());
+
+    FileIO io = SparkUtil.serializableFileIO(table);
+
+    //Going the RDD Route because our reader functions all work with full 
Manifest Files
+    JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(session.sparkContext());
+
+    JavaRDD<ManifestFile> manifestsToScan =

Review comment:
       Will we always rely on `spark.default.parallelism` here?

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;
+  private SparkSession session;
+
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+  private Consumer<String> deleteFunc = defaultDelete;
+
+
+  ExpireSnapshotsAction(SparkSession session, Table table) {
+    this.session = session;
+    this.table = table;
+    this.ops = ((HasTableOperations) table).operations();
+    this.base = ops.current();
+    this.localExpireSnapshots = 
table.expireSnapshots().deleteExpiredFiles(false);
+  }
+
+  public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
+    localExpireSnapshots.expireSnapshotId(expireSnapshotId);
+    return this;
+  }
+
+  public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
+    localExpireSnapshots.expireOlderThan(timestampMillis);
+    return this;
+  }
+
+  public ExpireSnapshotsAction retainLast(int numSnapshots) {
+    localExpireSnapshots.retainLast(numSnapshots);
+    return this;
+  }
+
+  public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
+    deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * Execute is a synonym for commit in this implementation. Calling either 
commit or execute will
+   * launch the Spark equivalent of RemoveSnapshots.
+   *
+   * @return nothing
+   */
+  @Override
+  public ExpireSnapshotResults execute() {
+    localExpireSnapshots.commit();
+
+    TableMetadata currentMetadata = ops.refresh();
+
+    //Locally determine which snapshots have been expired
+    SnapshotExpirationChanges expiredSnapshotChanges =
+        ExpireSnapshotUtil.getExpiredSnapshots(currentMetadata, base);
+
+    //Locally determine which manifests will need to be scanned, reverted, 
deleted
+    ManifestExpirationChanges manifestExpirationChanges =
+        ExpireSnapshotUtil.determineManifestChangesFromSnapshotExpiration(
+            expiredSnapshotChanges.validSnapshotIds(), 
expiredSnapshotChanges.expiredSnapshotIds(),
+            currentMetadata, base, ops.io());
+
+    FileIO io = SparkUtil.serializableFileIO(table);
+
+    //Going the RDD Route because our reader functions all work with full 
Manifest Files
+    JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(session.sparkContext());
+
+    JavaRDD<ManifestFile> manifestsToScan =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToScan()));
+
+    JavaRDD<ManifestFile> manifestsToRevert =
+        javaSparkContext
+            .parallelize(new 
LinkedList<>(manifestExpirationChanges.manifestsToRevert()));
+
+    FileIO serializableIO = SparkUtil.serializableFileIO(table);
+
+    Broadcast<Map<Integer, PartitionSpec>> broadcastedSpecLookup =
+        javaSparkContext.broadcast(ops.current().specsById());
+
+    Broadcast<Set<Long>> broadcastValidIDs =
+        javaSparkContext.broadcast(expiredSnapshotChanges.validSnapshotIds());
+
+    JavaRDD<String> filesToDeleteFromScan = 
manifestsToScan.mapPartitions(manifests -> {
+      Map<Integer, PartitionSpec> specLookup = 
broadcastedSpecLookup.getValue();
+      Set<Long> validIds = broadcastValidIDs.getValue();
+      Set<String> filesToDelete = new HashSet<>();
+      Tasks.foreach(ImmutableList.copyOf(manifests))
+          .retry(3).suppressFailureWhenFinished()
+          .executeWith(ThreadPools.getWorkerPool())

Review comment:
       I am not sure we need to use `ThreadPools` on executors. First, 
`ThreadPools` in Iceberg was meant to parallelize some work on the driver. 
Second, we will have only one core executing a task in most cases. Since we are 
already parallelizing the work using Spark, I don't think we need the thread 
pool here.

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestEntry;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil;
+import org.apache.iceberg.util.ExpireSnapshotUtil.ManifestExpirationChanges;
+import org.apache.iceberg.util.ExpireSnapshotUtil.SnapshotExpirationChanges;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotResults> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private final Table table;
+  private final TableOperations ops;
+  private final ExpireSnapshots localExpireSnapshots;
+  private final TableMetadata base;
+  private SparkSession session;
+
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+  private Consumer<String> deleteFunc = defaultDelete;
+
+
+  ExpireSnapshotsAction(SparkSession session, Table table) {
+    this.session = session;
+    this.table = table;
+    this.ops = ((HasTableOperations) table).operations();
+    this.base = ops.current();
+    this.localExpireSnapshots = 
table.expireSnapshots().deleteExpiredFiles(false);
+  }
+
+  public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
+    localExpireSnapshots.expireSnapshotId(expireSnapshotId);
+    return this;
+  }
+
+  public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
+    localExpireSnapshots.expireOlderThan(timestampMillis);
+    return this;
+  }
+
+  public ExpireSnapshotsAction retainLast(int numSnapshots) {
+    localExpireSnapshots.retainLast(numSnapshots);
+    return this;
+  }
+
+  public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
+    deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * Execute is a synonym for commit in this implementation. Calling either 
commit or execute will
+   * launch the Spark equivalent of RemoveSnapshots.
+   *
+   * @return nothing
+   */
+  @Override
+  public ExpireSnapshotResults execute() {
+    localExpireSnapshots.commit();
+
+    TableMetadata currentMetadata = ops.refresh();
+
+    //Locally determine which snapshots have been expired
+    SnapshotExpirationChanges expiredSnapshotChanges =
+        ExpireSnapshotUtil.getExpiredSnapshots(currentMetadata, base);
+
+    //Locally determine which manifests will need to be scanned, reverted, 
deleted
+    ManifestExpirationChanges manifestExpirationChanges =
+        ExpireSnapshotUtil.determineManifestChangesFromSnapshotExpiration(
+            expiredSnapshotChanges.validSnapshotIds(), 
expiredSnapshotChanges.expiredSnapshotIds(),
+            currentMetadata, base, ops.io());
+
+    FileIO io = SparkUtil.serializableFileIO(table);

Review comment:
       This seems not used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to