kbendick commented on a change in pull request #4307:
URL: https://github.com/apache/iceberg/pull/4307#discussion_r831639108



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
##########
@@ -273,4 +284,35 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  @VisibleForTesting
+  static class PartitionAwareHiddenPathFilter implements PathFilter, 
Serializable {
+
+    private final Set<String> hiddenPathPartitionNames;
+
+    PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
+      this.hiddenPathPartitionNames = hiddenPathPartitionNames;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      boolean isHiddenPartitionPath = 
hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
+      return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
+    }
+
+    static PathFilter build(Map<Integer, PartitionSpec> specs) {
+      Set<String> partitionNames = new HashSet<>();

Review comment:
       Nit: We don't use `new HashSet<>()` directly, as generally speaking that 
will usually lead to having to grow the underlying collection multiple times.
   
   We use `Sets.newHashSet()`, or preferably the constructor that directly 
takes in its elements or the expected size if possible. Otherwise, we use 
`Sets.newHashSet()`.
   
   We also shade and relocate Guava so that we don't conflict with the user's 
existing Guava version on their classpath. The correct import would be for 
`org.apache.iceberg.relocated.com.google.common.collect.Sets`.
   
   All that said, I think that you might be able to apply a `filter` and then 
use `Collectors.toSet` below instead.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
##########
@@ -273,4 +284,35 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  @VisibleForTesting
+  static class PartitionAwareHiddenPathFilter implements PathFilter, 
Serializable {
+
+    private final Set<String> hiddenPathPartitionNames;
+
+    PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
+      this.hiddenPathPartitionNames = hiddenPathPartitionNames;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      boolean isHiddenPartitionPath = 
hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
+      return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
+    }
+
+    static PathFilter build(Map<Integer, PartitionSpec> specs) {

Review comment:
       Nit: You might want to add a null check here and return 
`HiddenPathFilter` immediately.
   
   I don't think that the `.specs()` method should return `null` ever, but 
given the way this code is organized, I'd say that's not a bad idea.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
##########
@@ -551,6 +554,48 @@ public void testManyLeafPartitions() throws 
InterruptedException {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testHiddenPartitionPaths() throws InterruptedException {
+    Schema schema = new Schema(
+            optional(1, "c1", Types.IntegerType.get()),
+            optional(2, "_c2", Types.StringType.get()),
+            optional(3, "c3", Types.StringType.get())

Review comment:
       There are a few other places that are 8 spaces over when they should be 
4.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
##########
@@ -551,6 +554,48 @@ public void testManyLeafPartitions() throws 
InterruptedException {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testHiddenPartitionPaths() throws InterruptedException {
+    Schema schema = new Schema(
+            optional(1, "c1", Types.IntegerType.get()),
+            optional(2, "_c2", Types.StringType.get()),
+            optional(3, "c3", Types.StringType.get())

Review comment:
       Nit: These are all over-indented. Code that is continued on the next 
line in the same expression should be. 4 spaces over.
   
   There's an autoformatter that plugs into IntelliJ, and I believe eclipse and 
some other IDEs too:
   
   Here's a link 
https://iceberg.apache.org/contribute/#setting-up-ide-and-code-style
   
   But otherwise, these 3 lines should be 4 spaces over from the start of 
`Schema`. Like so
   ```java
       Schema schema = new Schema(
           optional(1, "c1", Types.IntegerType.get()),
           optional(2, "_c2", Types.StringType.get()),
           optional(3, "c3", Types.StringType.get()));
   ```

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
##########
@@ -551,6 +554,48 @@ public void testManyLeafPartitions() throws 
InterruptedException {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testHiddenPartitionPaths() throws InterruptedException {
+    Schema schema = new Schema(
+            optional(1, "c1", Types.IntegerType.get()),
+            optional(2, "_c2", Types.StringType.get()),
+            optional(3, "c3", Types.StringType.get())
+    );
+    PartitionSpec spec = PartitionSpec.builderFor(schema)
+            .truncate("_c2", 2)
+            .identity("c3")
+            .build();
+    Table table = TABLES.create(schema, spec, Maps.newHashMap(), 
tableLocation);
+
+    StructType structType = new StructType()
+            .add("c1", DataTypes.IntegerType)
+            .add("_c2", DataTypes.StringType)
+            .add("c3", DataTypes.StringType);
+    List<Row> records = Lists.newArrayList(
+        RowFactory.create(1, "AAAAAAAAAA", "AAAA")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, structType).coalesce(1);
+
+    df.select("c1", "_c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    df.write().mode("append").parquet(tableLocation + 
"/data/_c2_trunc=AA/c3=AAAA");
+    df.write().mode("append").parquet(tableLocation + 
"/data/_c2_trunc=AA/c3=AAAA");
+
+    Thread.sleep(1000);
+
+    SparkActions actions = SparkActions.get();
+
+    DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table)
+        .olderThan(System.currentTimeMillis())
+        .execute();
+
+    Assert.assertEquals("Should delete 2 files", 2, 
Iterables.size(result.orphanFileLocations()));
+  }

Review comment:
       Can we add a test that uses a partition spec, writes out data, then 
updates the partition spec, and write data again, in order to test that we're 
catching all partition specs propery?
   
   That way we guard against a regression.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to