This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 9b3de51d80974aa14b145c2a3f867388b8c361ac
Author: wangwj <[email protected]>
AuthorDate: Thu Dec 26 22:49:40 2024 +0800

    [core] Add check of older_than when RemoveOrphanFiles (#4779)
---
 .../apache/paimon/operation/OrphanFilesClean.java  | 19 ++++--
 .../paimon/operation/OrphanFilesCleanTest.java     | 40 +++++++++++++
 .../flink/RemoveOrphanFilesActionITCase.java       | 44 +++++++++-----
 .../action/RemoveOrphanFilesActionITCaseBase.java  | 70 ++++++++++++++--------
 .../procedure/RemoveOrphanFilesProcedureTest.scala |  5 ++
 5 files changed, 134 insertions(+), 44 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 274cdd52fe..fc2e1200f0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializableConsumer;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -377,9 +379,18 @@ public abstract class OrphanFilesClean implements 
Serializable {
     }
 
     public static long olderThanMillis(@Nullable String olderThan) {
-        return isNullOrWhitespaceOnly(olderThan)
-                ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1)
-                : DateTimeUtils.parseTimestampData(olderThan, 3, 
TimeZone.getDefault())
-                        .getMillisecond();
+        if (isNullOrWhitespaceOnly(olderThan)) {
+            return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+        } else {
+            Timestamp parsedTimestampData =
+                    DateTimeUtils.parseTimestampData(olderThan, 3, 
TimeZone.getDefault());
+            Preconditions.checkArgument(
+                    parsedTimestampData.compareTo(
+                                    
Timestamp.fromEpochMillis(System.currentTimeMillis()))
+                            < 0,
+                    "The arg olderThan must be less than now, because 
dataFiles that are currently being written and not referenced by snapshots will 
be mistakenly cleaned up.");
+
+            return parsedTimestampData.getMillisecond();
+        }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
new file mode 100644
index 0000000000..97ba35b2c0
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Utils for {@link OrphanFilesClean}. */
+public class OrphanFilesCleanTest {
+
+    @Test
+    public void testOlderThanMillis() {
+        // normal olderThan
+        OrphanFilesClean.olderThanMillis(null);
+        OrphanFilesClean.olderThanMillis("2024-12-21 23:00:00");
+
+        // non normal olderThan
+        assertThatThrownBy(() -> OrphanFilesClean.olderThanMillis("3024-12-21 
23:00:00"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The arg olderThan must be less than now, because 
dataFiles that are currently being written and not referenced by snapshots will 
be mistakenly cleaned up.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
index a168c3785c..5a9cd0fe2d 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DateTimeUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
@@ -124,17 +125,20 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
         
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
 
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String withDryRun =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true)",
-                        database, tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '%s', true)",
+                        database, tableName, olderThan);
         ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
         assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));
 
         String withOlderThan =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
-                        database, tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '%s')",
+                        database, tableName, olderThan);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
         assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), 
Row.of("2"));
@@ -178,17 +182,19 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         CloseableIterator<Row> withParallelismCollect = 
executeSQL(withParallelism);
         
assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0"));
 
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String withDryRun =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true)",
-                        database, "*");
+                        "CALL sys.remove_orphan_files('%s.%s', '%s', true)",
+                        database, "*", olderThan);
         ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
         assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));
 
         String withOlderThan =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
-                        database, "*");
+                        "CALL sys.remove_orphan_files('%s.%s', '%s')", 
database, "*", olderThan);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
         assertThat(actualDeleteFile).containsOnly(Row.of("4"));
@@ -237,10 +243,13 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
                     false,
                     true);
         }
+
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String procedure =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
-                        database, "*");
+                        "CALL sys.remove_orphan_files('%s.%s', '%s')", 
database, "*", olderThan);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(procedure));
         assertThat(actualDeleteFile).containsOnly(Row.of("4"));
     }
@@ -272,26 +281,29 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
         
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
 
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String withLocalMode =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true, 5, 'local')",
-                        database, tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 
'local')",
+                        database, tableName, olderThan);
         ImmutableList<Row> actualLocalRunDeleteFile =
                 ImmutableList.copyOf(executeSQL(withLocalMode));
         assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));
 
         String withDistributedMode =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true, 5, 'distributed')",
-                        database, tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 
'distributed')",
+                        database, tableName, olderThan);
         ImmutableList<Row> actualDistributedRunDeleteFile =
                 ImmutableList.copyOf(executeSQL(withDistributedMode));
         assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));
 
         String withInvalidMode =
                 String.format(
-                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true, 5, 'unknown')",
-                        database, tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 
'unknown')",
+                        database, tableName, olderThan);
         assertThatCode(() -> executeSQL(withInvalidMode))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessageContaining("Unknown mode");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 77f3be2f0c..2828101114 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DateTimeUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
@@ -129,23 +130,28 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
         CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
         
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
 
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String withDryRun =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true)",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s', dry_run => true)"
+                                : "CALL sys.remove_orphan_files('%s.%s', '%s', 
true)",
                         database,
-                        tableName);
+                        tableName,
+                        olderThan);
         ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
         assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));
 
         String withOlderThan =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s')"
+                                : "CALL sys.remove_orphan_files('%s.%s', 
'%s')",
                         database,
-                        tableName);
+                        tableName,
+                        olderThan);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
         assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), 
Row.of("2"));
@@ -195,23 +201,28 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
         CloseableIterator<Row> withParallelismCollect = 
executeSQL(withParallelism);
         
assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0"));
 
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String withDryRun =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true)",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s', dry_run => true)"
+                                : "CALL sys.remove_orphan_files('%s.%s', '%s', 
true)",
                         database,
-                        "*");
+                        "*",
+                        olderThan);
         ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
         assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));
 
         String withOlderThan =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s')"
+                                : "CALL sys.remove_orphan_files('%s.%s', 
'%s')",
                         database,
-                        "*");
+                        "*",
+                        olderThan);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
         assertThat(actualDeleteFile).containsOnly(Row.of("4"));
@@ -261,13 +272,18 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
                     false,
                     true);
         }
+
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String procedure =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s')"
+                                : "CALL sys.remove_orphan_files('%s.%s', 
'%s')",
                         database,
-                        "*");
+                        "*",
+                        olderThan);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(procedure));
         assertThat(actualDeleteFile).containsOnly(Row.of("4"));
     }
@@ -305,13 +321,17 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
         CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
         
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
 
+        String olderThan =
+                DateTimeUtils.formatLocalDateTime(
+                        
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
         String withLocalMode =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'local')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'local')",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 
'local')"
+                                : "CALL sys.remove_orphan_files('%s.%s', '%s', 
true, 5, 'local')",
                         database,
-                        tableName);
+                        tableName,
+                        olderThan);
         ImmutableList<Row> actualLocalRunDeleteFile =
                 ImmutableList.copyOf(executeSQL(withLocalMode));
         assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));
@@ -319,10 +339,11 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
         String withDistributedMode =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'distributed')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'distributed')",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 
'distributed')"
+                                : "CALL sys.remove_orphan_files('%s.%s', '%s', 
true, 5, 'distributed')",
                         database,
-                        tableName);
+                        tableName,
+                        olderThan);
         ImmutableList<Row> actualDistributedRunDeleteFile =
                 ImmutableList.copyOf(executeSQL(withDistributedMode));
         assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));
@@ -330,10 +351,11 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
         String withInvalidMode =
                 String.format(
                         isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'unknown')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'unknown')",
+                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 
'unknown')"
+                                : "CALL sys.remove_orphan_files('%s.%s', '%s', 
true, 5, 'unknown')",
                         database,
-                        tableName);
+                        tableName,
+                        olderThan);
         assertThatCode(() -> executeSQL(withInvalidMode))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessageContaining("Unknown mode");
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index 3ffe7fba26..b1bb3124e3 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.procedure
 
+import org.apache.paimon.data.Timestamp
 import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.utils.DateTimeUtils
@@ -61,6 +62,10 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
           TimeUnit.SECONDS.toMillis(1)),
       3)
 
+    System.out.println("orphanFile2ModTime is : " + orphanFile2ModTime);
+    System.out.println("older_than1 is : " + older_than1)
+    System.out.println("in ut Timestamp.now() is : " + Timestamp.now)
+
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than1')"),
       Row(1, 1) :: Nil)

Reply via email to