sreejasahithi commented on code in PR #10145:
URL: https://github.com/apache/ozone/pull/10145#discussion_r3159905459


##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -251,4 +271,109 @@ private Set<Pair<String, String>> 
rewriteVersionFile(TableMetadata metadata, Str
 
     return result;
   }
+
+  private Set<String> manifestsToRewrite(Set<Snapshot> deltaSnapshots, 
TableMetadata startMetadata) {
+    Table endStaticTable = 
RewriteTablePathOzoneUtils.newStaticTable(endVersionName, table.io());
+
+    final Set<Long> deltaSnapshotIds;
+    if (startMetadata != null) {
+      deltaSnapshotIds = 
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+    } else {
+      deltaSnapshotIds = null;
+    }
+
+    Set<String> manifestPaths = ConcurrentHashMap.newKeySet();
+    int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+    Semaphore semaphore = new Semaphore(maxInFlight);
+
+    ExecutorCompletionService<Void> completionService = new 
ExecutorCompletionService<>(executorService);
+
+    int submittedTasks = 0;
+    int completedTasks = 0;
+
+    try {
+      for (Snapshot snapshot : endStaticTable.snapshots()) {
+        semaphore.acquire(); // blocks when maxInFlight tasks are already 
in-flight
+
+        final long snapshotId = snapshot.snapshotId();
+        final String manifestListLocation = snapshot.manifestListLocation();
+
+        boolean taskSubmitted = false;
+        try {
+          completionService.submit(() -> {
+            try (CloseableIterable<ManifestFile> manifests =
+                     InternalData.read(
+                             FileFormat.AVRO,
+                             table.io().newInputFile(manifestListLocation))
+                         .setRootType(GenericManifestFile.class)
+                         .setCustomType(
+                             ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
+                             GenericPartitionFieldSummary.class)
+                         .project(ManifestFile.schema())
+                         .build()) {
+
+              for (ManifestFile manifest : manifests) {
+                if (deltaSnapshotIds == null) {
+                  manifestPaths.add(manifest.path());
+                } else if (manifest.snapshotId() != null

Review Comment:
   For any manifest list file that was correctly written, snapshotId() will 
never be null. 
   The check added here is a defensive check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to