aokolnychyi commented on code in PR #4629:
URL: https://github.com/apache/iceberg/pull/4629#discussion_r866006688


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java:
##########
@@ -243,23 +246,30 @@ private BaseExpireSnapshotsActionResult 
deleteFiles(Iterator<Row> expired) {
           String file = fileInfo.getString(0);
           String type = fileInfo.getString(1);
           deleteFunc.accept(file);
-          switch (type) {
-            case CONTENT_FILE:
-              dataFileCount.incrementAndGet();
-              LOG.trace("Deleted Content File: {}", file);
-              break;
-            case MANIFEST:
-              manifestCount.incrementAndGet();
-              LOG.debug("Deleted Manifest: {}", file);
-              break;
-            case MANIFEST_LIST:
-              manifestListCount.incrementAndGet();
-              LOG.debug("Deleted Manifest List: {}", file);
-              break;
+
+          if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+            dataFileCount.incrementAndGet();
+            LOG.trace("Deleted Data File: {}", file);
+          } else if 
(FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+            posDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Positional Delete File: {}", file);
+          } else if 
(FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+            eqDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Equality Delete File: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST)) {
+            manifestCount.incrementAndGet();
+            LOG.debug("Deleted Manifest: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST_LIST)) {
+            manifestListCount.incrementAndGet();
+            LOG.debug("Deleted Manifest List: {}", file);
+          } else {
+            throw new ValidationException("Illegal file type: %s", type);
           }
         });
 
-    LOG.info("Deleted {} total files", dataFileCount.get() + 
manifestCount.get() + manifestListCount.get());
-    return new BaseExpireSnapshotsActionResult(dataFileCount.get(), 
manifestCount.get(), manifestListCount.get());
+    long contentFileCount = dataFileCount.get() + posDeleteFileCount.get() + 
eqDeleteFileCount.get();
+    LOG.info("Deleted {} total files", contentFileCount + manifestCount.get() 
+ manifestListCount.get());
+    return new BaseExpireSnapshotsActionResult(dataFileCount.get(), 
posDeleteFileCount.get(),

Review Comment:
   nit: I feel like an empty line before `return` would be appropriate to 
separate the blocks.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -122,18 +125,25 @@ protected Table newStaticTable(TableMetadata metadata, 
FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file locations by reading all manifests
-  protected Dataset<Row> buildValidContentFileDF(Table table) {
+  // builds a DF of delete and data file path and type by reading all manifests
+  protected Dataset<Row> buildValidContentFileTypeDF(Table table) {
     JavaSparkContext context = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
-    Broadcast<FileIO> ioBroadcast = 
context.broadcast(SparkUtil.serializableFileIO(table));
+    Broadcast<Table> tableBroadcast = 
context.broadcast(SerializableTable.copyOf(table));
 
     Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, 
ALL_MANIFESTS)
         .selectExpr("path", "length", "partition_spec_id as partitionSpecId", 
"added_snapshot_id as addedSnapshotId")
         .dropDuplicates("path")
         .repartition(spark.sessionState().conf().numShufflePartitions()) // 
avoid adaptive execution combining tasks
         .as(Encoders.bean(ManifestFileBean.class));
 
-    return allManifests.flatMap(new ReadManifest(ioBroadcast), 
Encoders.STRING()).toDF(FILE_PATH);
+    return allManifests.flatMap(
+        new ReadManifest(tableBroadcast), Encoders.tuple(Encoders.STRING(), 
Encoders.STRING()))

Review Comment:
   nit: I think the formatting is a bit off. I'd do this:
   
   ```
   return allManifests
       .flatMap(new ReadManifest(tableBroadcast), 
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
       .toDF(FILE_PATH, FILE_TYPE);
   ```
   
   



##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -101,6 +101,16 @@ interface Result {
      */
     long deletedDataFilesCount();
 
+    /**
+     * Returns the number of deleted equality delete files.
+     */
+    long deletedEqualityDeleteFilesCount();
+
+    /**
+     * Returns the number of deleted positional delete files.
+     */
+    long deletedPositionalDeleteFilesCount();

Review Comment:
   I think we use `positionDeleteFiles`, not `positionalDeleteFiles`. Better be 
consistent.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean 
manifest) {
+      switch (manifest.content()) {
+        case DATA:

Review Comment:
   It would be nice to use `all_files` but we can't for performance reasons. If 
I remember correctly, we had to migrate to the current implementation to avoid 
reading all manifest list files on the driver. That was slow for tables with a 
lot of snapshots.
   
   I think we should fix `ManifestFileBean` by adding `content` field and 
handling it everywhere. Then we can project it while reading.
   
   
   ```
   private static final String[] MANIFEST_FILE_BEAN_PROJECTION = {
       "path", "length", "partition_spec_id as partitionSpecId", "content", 
"added_snapshot_id as addedSnapshotId"
   };
   
   ...
   
   Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, 
ALL_MANIFESTS)
       .selectExpr(MANIFEST_FILE_BEAN_PROJECTION)
       .dropDuplicates("path")
       .repartition(spark.sessionState().conf().numShufflePartitions()) // 
avoid adaptive execution combining tasks
       .as(Encoders.bean(ManifestFileBean.class));
   ```
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java:
##########
@@ -52,6 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
 
   private static final StructType OUTPUT_TYPE = new StructType(new 
StructField[]{
       new StructField("deleted_data_files_count", DataTypes.LongType, true, 
Metadata.empty()),
+      new StructField("deleted_positional_delete_files_count", 
DataTypes.LongType, true, Metadata.empty()),

Review Comment:
   This should also be `position` vs `positional`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean 
manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), 
table.getValue().specs()).iterator(),

Review Comment:
   If I understand correctly, we can't use `readPaths` as we need to fetch the 
content file type. That being said, I think we have to add a projection. Right 
now, we will read all columns whereas we read only paths before.
   
   Will something like this work?
   
   ```
       public CloseableIterator<Tuple2<String, String>> 
entries(ManifestFileBean manifest) {
         FileIO io = table.getValue().io();
         Map<Integer, PartitionSpec> specs = table.getValue().specs();
         ImmutableList<String> projection = 
ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
   
         switch (manifest.content()) {
           case DATA:
             return CloseableIterator.transform(
                 ManifestFiles.read(manifest, io, 
specs).select(projection).iterator(),
                 ReadManifest::contentFileWithType);
           case DELETES:
             return CloseableIterator.transform(
                 ManifestFiles.readDeleteManifest(manifest, io, 
specs).select(projection).iterator(),
                 ReadManifest::contentFileWithType);
           default:
             throw new IllegalArgumentException("Unsupported manifest content 
type:" + manifest.content());
         }
       }
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean 
manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), 
table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        case DELETES:
+          return CloseableIterator.transform(
+              ManifestFiles.readDeleteManifest(manifest, 
table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content 
type:" + manifest.content());
+      }
+    }
+
+    static Tuple2<String, String> contentFileType(ContentFile file) {

Review Comment:
   nit: `contentFileWithType`?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean 
manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), 
table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        case DELETES:
+          return CloseableIterator.transform(
+              ManifestFiles.readDeleteManifest(manifest, 
table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content 
type:" + manifest.content());
+      }
+    }
+
+    static Tuple2<String, String> contentFileType(ContentFile file) {
+      return new Tuple2(file.path().toString(), file.content().toString());

Review Comment:
   nit: `Tuple2<>`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -122,18 +125,25 @@ protected Table newStaticTable(TableMetadata metadata, 
FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file locations by reading all manifests
-  protected Dataset<Row> buildValidContentFileDF(Table table) {
+  // builds a DF of delete and data file path and type by reading all manifests
+  protected Dataset<Row> buildValidContentFileTypeDF(Table table) {

Review Comment:
   nit: what about `buildValidContentFileWithTypeDF`?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));

Review Comment:
   nit: raw type, let's use `ClosingIterator<>(...)`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean 
manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), 
table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        case DELETES:
+          return CloseableIterator.transform(
+              ManifestFiles.readDeleteManifest(manifest, 
table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content 
type:" + manifest.content());
+      }
+    }
+
+    static Tuple2<String, String> contentFileType(ContentFile file) {

Review Comment:
   nit: `ContentFile<?>`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean 
manifest) {
+      switch (manifest.content()) {
+        case DATA:

Review Comment:
   I am afraid we don't project `content` in manifests-related metadata tables 
but we should probably fix that. I discovered that in another use case. I can 
submit a PR for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to