vaultah commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2287892202


##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -494,77 +478,128 @@ public RewriteContentFileResult 
appendDeleteFile(RewriteResult<DeleteFile> r1) {
     }
   }
 
-  /** Rewrite manifest files in a distributed manner and return rewritten data 
files path pairs. */
-  private RewriteContentFileResult rewriteManifests(
+  private static class ManifestsRewriteResult {
+    private final RewriteContentFileResult contentFileResult;
+    private final Map<String, Long> rewrittenManifests;
+
+    ManifestsRewriteResult(
+        RewriteContentFileResult contentFileResult, Map<String, Long> 
rewrittenManifests) {
+      this.contentFileResult = contentFileResult;
+      this.rewrittenManifests = rewrittenManifests;
+    }
+
+    public RewriteContentFileResult getContentFileResult() {
+      return contentFileResult;
+    }
+
+    public Map<String, Long> getRewrittenManifests() {
+      return rewrittenManifests;
+    }
+  }
+
+  /**
+   * Rewrite manifest files in a distributed manner and return the resulting 
manifests and content
+   * files selected for rewriting.
+   */
+  private ManifestsRewriteResult rewriteManifests(
       Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata, 
Set<ManifestFile> toRewrite) {
     if (toRewrite.isEmpty()) {
-      return new RewriteContentFileResult();
+      return new ManifestsRewriteResult(new RewriteContentFileResult(), 
Maps.newHashMap());
     }
 
     Encoder<ManifestFile> manifestFileEncoder = 
Encoders.javaSerialization(ManifestFile.class);
+    Encoder<RewriteContentFileResult> contentResultEncoder =
+        Encoders.javaSerialization(RewriteContentFileResult.class);
+    Encoder<Tuple3<String, Long, RewriteContentFileResult>> tupleEncoder =
+        Encoders.tuple(Encoders.STRING(), Encoders.LONG(), 
contentResultEncoder);
+
     Dataset<ManifestFile> manifestDS =
         spark().createDataset(Lists.newArrayList(toRewrite), 
manifestFileEncoder);
     Set<Long> deltaSnapshotIds =
         
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
 
-    return manifestDS
-        .repartition(toRewrite.size())
-        .map(
-            toManifests(
-                tableBroadcast(),
-                sparkContext().broadcast(deltaSnapshotIds),
-                stagingDir,
-                tableMetadata.formatVersion(),
-                sourcePrefix,
-                targetPrefix),
-            Encoders.bean(RewriteContentFileResult.class))
-        // duplicates are expected here as the same data file can have 
different statuses
-        // (e.g. added and deleted)
-        .reduce((ReduceFunction<RewriteContentFileResult>) 
RewriteContentFileResult::append);
-  }
-
-  private static MapFunction<ManifestFile, RewriteContentFileResult> 
toManifests(
-      Broadcast<Table> table,
-      Broadcast<Set<Long>> deltaSnapshotIds,
-      String stagingLocation,
-      int format,
-      String sourcePrefix,
-      String targetPrefix) {
+    RewriteContentFileResult finalContentResult = new 
RewriteContentFileResult();
+    Iterator<Tuple3<String, Long, RewriteContentFileResult>> resultIterator =
+        manifestDS
+            .repartition(toRewrite.size())
+            .map(
+                toManifests(
+                    tableBroadcast(),
+                    sparkContext().broadcast(deltaSnapshotIds),
+                    stagingDir,
+                    tableMetadata.formatVersion(),
+                    sourcePrefix,
+                    targetPrefix),
+                tupleEncoder)
+            .toLocalIterator();
+
+    Map<String, Long> rewrittenManifests = Maps.newHashMap();
+
+    while (resultIterator.hasNext()) {
+      Tuple3<String, Long, RewriteContentFileResult> resultTuple = 
resultIterator.next();
+      String originalManifestPath = resultTuple._1();
+      Long rewrittenManifestLength = resultTuple._2();
+      RewriteContentFileResult contentFileResult = resultTuple._3();
+      String stagingManifestPath =
+          RewriteTablePathUtil.stagingPath(originalManifestPath, sourcePrefix, 
stagingDir);
+      String targetManifestPath =
+          RewriteTablePathUtil.newPath(originalManifestPath, sourcePrefix, 
targetPrefix);
+
+      finalContentResult.append(contentFileResult);
+      finalContentResult.copyPlan().add(Pair.of(stagingManifestPath, 
targetManifestPath));
+      rewrittenManifests.put(originalManifestPath, rewrittenManifestLength);
+    }
+
+    return new ManifestsRewriteResult(finalContentResult, rewrittenManifests);
+  }
+
+  private static MapFunction<ManifestFile, Tuple3<String, Long, 
RewriteContentFileResult>>
+      toManifests(
+          Broadcast<Table> table,
+          Broadcast<Set<Long>> deltaSnapshotIds,
+          String stagingLocation,
+          int format,
+          String sourcePrefix,
+          String targetPrefix) {
 
     return manifestFile -> {
-      RewriteContentFileResult result = new RewriteContentFileResult();
       switch (manifestFile.content()) {
         case DATA:
-          result.appendDataFile(
+          Pair<Long, RewriteResult<DataFile>> dataManifestResult =
               writeDataManifest(
                   manifestFile,
                   table,
                   deltaSnapshotIds,
                   stagingLocation,
                   format,
                   sourcePrefix,
-                  targetPrefix));
-          break;
+                  targetPrefix);
+          return Tuple3.apply(
+              manifestFile.path(),

Review Comment:
   It's possible but it'd be a poor fit, because `RewriteContentFileResult` may 
store the results for more than one manifest. That is, while everything in 
`resultTuple` does relate to one manifest, `contentFileResult` is later merged 
into `finalContentResult` that will contain the results of rewriting all 
manifests.
   
   ```java
   RewriteContentFileResult finalContentResult = new RewriteContentFileResult();
   
   // while (...) {
   String originalManifestPath = resultTuple._1();
   Long rewrittenManifestLength = resultTuple._2();
   RewriteContentFileResult contentFileResult = resultTuple._3();
   
   finalContentResult.append(contentFileResult);
   // }
   ```
   
   The name of the class would no longer match its intended purpose, and in 
that case we'd also need to leave path and length of `finalContentResult` 
unset, which may be confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to