amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r995269029
##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,62 @@ public void cleanFiles(TableMetadata beforeExpiration,
TableMetadata afterExpira
}
private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
- Set<ManifestFile> manifestFiles = Sets.newHashSet();
- for (Snapshot snapshot : snapshots) {
- try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =
readManifestFiles(snapshot)) {
- for (ManifestFile manifestFile : manifestFilesForSnapshot) {
- manifestFiles.add(manifestFile.copy());
- }
- } catch (IOException e) {
- throw new RuntimeIOException(
- e, "Failed to close manifest list: %s",
snapshot.manifestListLocation());
- }
- }
+ Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+ Tasks.foreach(snapshots)
+ .retry(3)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(planExecutorService)
+ .onFailure(
+ (snapshot, exc) ->
+ LOG.warn(
+ "Failed to determine manifests for snapshot {}",
+ snapshot.snapshotId(),
+ exc))
+ .run(
+ snapshot -> {
+ try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =
readManifestFiles(snapshot)) {
+ for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+ manifests.add(manifestFile.copy());
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(
+ e, "Failed to close manifest list: %s",
snapshot.manifestListLocation());
+ }
+ });
+
+ return manifests;
+ }
+
+ private Set<ManifestFile> manifestFilesToDelete(
+ Set<ManifestFile> currentManifests, Set<Snapshot> expiredSnapshots) {
+ Set<ManifestFile> manifestFilesToDelete = ConcurrentHashMap.newKeySet();
- return manifestFiles;
+ Tasks.foreach(expiredSnapshots)
+ .retry(3)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(planExecutorService)
+ .onFailure(
+ (snapshot, exc) ->
+ LOG.warn(
+ "Failed to determine manifests for snapshot {}",
+ snapshot.snapshotId(),
+ exc))
+ .run(
+ snapshot -> {
+ try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =
readManifestFiles(snapshot)) {
+ for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+ if (!currentManifests.contains(manifestFile)) {
+ manifestFilesToDelete.add(manifestFile.copy());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(
+ e, "Failed to close manifest list: %s",
snapshot.manifestListLocation());
+ }
+ });
+ return manifestFilesToDelete;
Review Comment:
@rdblue regarding
https://github.com/apache/iceberg/pull/5669#discussion_r993986400
I don't think we can exactly follow the pattern that we do for determining
reachable data file. After determining the current reachable manifest set, that
should be reused when determining the reachable data files. So first the set of
currently reachable manifests is built in parallel. Then the candidate set is
built from parallelizing across the expired snapshots.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]