aokolnychyi commented on a change in pull request #1211: URL: https://github.com/apache/iceberg/pull/1211#discussion_r456073706
########## File path: core/src/main/java/org/apache/iceberg/util/ExpireSnapshotUtil.java ########## @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.GenericManifestFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ExpireSnapshotUtil { + + //Utility Class No Instantiation Allowed + private ExpireSnapshotUtil() {} + + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotUtil.class); + + private static AncestorIds getAncestorIds(TableMetadata currentTableMetadata) { + // this is the set of ancestors of the current table state. when removing snapshots, this must + // only remove files that were deleted in an ancestor of the current table state to avoid + // physically deleting files that were logically deleted in a commit that was rolled back. + Set<Long> ancestorIds = Sets.newHashSet(SnapshotUtil + .ancestorIds(currentTableMetadata.currentSnapshot(), currentTableMetadata::snapshot)); + + Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet(); + for (long snapshotId : ancestorIds) { + String sourceSnapshotId = currentTableMetadata.snapshot(snapshotId).summary() + .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP); + if (sourceSnapshotId != null) { + // protect any snapshot that was cherry-picked into the current table state + pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId)); + } + } + + return new AncestorIds(ancestorIds, pickedAncestorSnapshotIds); + } + + /** + * Given a list of currently valid snapshots, extract all the manifests from those snapshots if + * there is an error while reading manifest lists an incomplete list of manifests will be + * produced. + * + * @param currentTableSnapshots a list of currently valid non-expired snapshots + * @return all of the manifests of those snapshots + */ + private static Set<ManifestFile> getValidManifests( + List<Snapshot> currentTableSnapshots, TableOperations ops) { + + Set<ManifestFile> validManifests = Sets.newHashSet(); + Tasks.foreach(currentTableSnapshots).retry(3).suppressFailureWhenFinished() + .onFailure((snapshot, exc) -> + LOG.warn("Failed on snapshot {} while reading manifest list: {}", snapshot.snapshotId(), + snapshot.manifestListLocation(), exc)) + .run( + snapshot -> { + try (CloseableIterable<ManifestFile> manifests = readManifestFiles(snapshot, ops)) { + for (ManifestFile manifest : manifests) { + validManifests.add(manifest); + } + } catch (IOException e) { + throw new UncheckedIOException( + String.format("Failed to close manifest list: %s", + snapshot.manifestListLocation()), + e); + } + }); + return validManifests; + } + + /** + * Find manifests to clean up that are still referenced by a valid snapshot, but written by an + * expired snapshot. + * + * @param validSnapshotIds A list of the snapshots which are not expired + * @param currentTableMetadata A reference to the table containing the snapshots + * @return MetadataFiles which must be scanned to look for files to delete + */ + private static Set<ManifestFile> validManifestsInExpiredSnapshots( + Set<Long> validSnapshotIds, TableMetadata currentTableMetadata) { + + AncestorIds ids = getAncestorIds(currentTableMetadata); + Set<Long> ancestorIds = ids.getAncestorIds(); + Set<Long> pickedAncestorSnapshotIds = ids.getPickedAncestorIds(); + + Set<ManifestFile> manifestsToScan = Sets.newHashSet(); + manifestsToScan.forEach(manifest -> { Review comment: Definitely +1. Can you submit a separate PR for that? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
