This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ebef05034 [flink] change env.fromData to fromElements for 
compatibility with flink 1.18 and lower versions (#4541)
ebef05034 is described below

commit ebef050348de90f91193314079a6fb08e785c985
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Nov 19 20:19:36 2024 +0800

    [flink] change env.fromData to fromElements for compatibility with flink 
1.18 and lower versions (#4541)
---
 .../procedure/RemoveOrphanFilesProcedure.java      |  62 +++-
 .../flink}/RemoveOrphanFilesActionITCase.java      | 108 +++----
 .../flink/RemoveOrphanFilesActionITCase.java       |  25 ++
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java |   2 +-
 .../action/RemoveOrphanFilesActionITCase.java      | 323 +--------------------
 ...java => RemoveOrphanFilesActionITCaseBase.java} |   4 +-
 6 files changed, 114 insertions(+), 410 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index c5fa7b7ba..7695c510b 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,9 +20,12 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
+import org.apache.paimon.operation.LocalOrphanFilesClean;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
+import java.util.Locale;
+
 import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
 import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
 
@@ -66,20 +69,57 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
             boolean dryRun,
             Integer parallelism)
             throws Exception {
+        return call(procedureContext, tableId, olderThan, dryRun, parallelism, 
null);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String olderThan,
+            boolean dryRun,
+            Integer parallelism,
+            String mode)
+            throws Exception {
         Identifier identifier = Identifier.fromString(tableId);
         String databaseName = identifier.getDatabaseName();
         String tableName = identifier.getObjectName();
-
-        long deleted =
-                FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
-                        procedureContext.getExecutionEnvironment(),
-                        catalog,
-                        olderThanMillis(olderThan),
-                        createFileCleaner(catalog, dryRun),
-                        parallelism,
-                        databaseName,
-                        tableName);
-        return new String[] {String.valueOf(deleted)};
+        if (mode == null) {
+            mode = "DISTRIBUTED";
+        }
+        long deletedFiles;
+        try {
+            switch (mode.toUpperCase(Locale.ROOT)) {
+                case "DISTRIBUTED":
+                    deletedFiles =
+                            FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
+                                    procedureContext.getExecutionEnvironment(),
+                                    catalog,
+                                    olderThanMillis(olderThan),
+                                    createFileCleaner(catalog, dryRun),
+                                    parallelism,
+                                    databaseName,
+                                    tableName);
+                    break;
+                case "LOCAL":
+                    deletedFiles =
+                            LocalOrphanFilesClean.executeDatabaseOrphanFiles(
+                                    catalog,
+                                    databaseName,
+                                    tableName,
+                                    olderThanMillis(olderThan),
+                                    createFileCleaner(catalog, dryRun),
+                                    parallelism);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            "Unknown mode: "
+                                    + mode
+                                    + ". Only 'DISTRIBUTED' and 'LOCAL' are 
supported.");
+            }
+            return new String[] {String.valueOf(deletedFiles)};
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
similarity index 71%
copy from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
copy to 
paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
index 938a8ce1b..46b62b6bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
@@ -16,11 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action;
+package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.RemoveOrphanFilesAction;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
@@ -40,8 +42,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,9 +55,8 @@ import static 
org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
-/** IT cases for {@link RemoveOrphanFilesAction}. */
+/** IT cases base for {@link RemoveOrphanFilesAction} in Flink 1.18. */
 public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
-
     private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
     private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";
 
@@ -96,9 +96,8 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         return new Path(table.location(), orphanFile);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testRunWithoutException(boolean isNamedArgument) throws 
Exception {
+    @Test
+    public void testRunWithoutException() throws Exception {
         createTableAndWriteData(tableName);
 
         List<String> args =
@@ -120,40 +119,29 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         assertThatCode(action2::run).doesNotThrowAnyException();
 
         String withoutOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s')"
-                                : "CALL sys.remove_orphan_files('%s.%s')",
-                        database,
-                        tableName);
+                String.format("CALL sys.remove_orphan_files('%s.%s')", 
database, tableName);
+
         CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
         
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
 
         String withDryRun =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true)",
-                        database,
-                        tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true)",
+                        database, tableName);
         ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
         assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));
 
         String withOlderThan =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
-                        database,
-                        tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
+                        database, tableName);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
         assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) 
throws Exception {
+    @Test
+    public void testRemoveDatabaseOrphanFilesITCase() throws Exception {
         createTableAndWriteData("tableName1");
         createTableAndWriteData("tableName2");
 
@@ -181,12 +169,7 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         assertThatCode(action3::run).doesNotThrowAnyException();
 
         String withoutOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s')"
-                                : "CALL sys.remove_orphan_files('%s.%s')",
-                        database,
-                        "*");
+                String.format("CALL sys.remove_orphan_files('%s.%s')", 
database, "*");
         CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
         
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
 
@@ -197,29 +180,22 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
 
         String withDryRun =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true)",
-                        database,
-                        "*");
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true)",
+                        database, "*");
         ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
         assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));
 
         String withOlderThan =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
-                        database,
-                        "*");
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
+                        database, "*");
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
         assertThat(actualDeleteFile).containsOnly(Row.of("4"));
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testCleanWithBranch(boolean isNamedArgument) throws Exception {
+    @Test
+    public void testCleanWithBranch() throws Exception {
         // create main branch
         FileStoreTable table = createTableAndWriteData(tableName);
 
@@ -263,18 +239,14 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         }
         String procedure =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
-                        database,
-                        "*");
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
+                        database, "*");
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(procedure));
         assertThat(actualDeleteFile).containsOnly(Row.of("4"));
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testRunWithMode(boolean isNamedArgument) throws Exception {
+    @Test
+    public void testRunWithMode() throws Exception {
         createTableAndWriteData(tableName);
 
         List<String> args =
@@ -296,44 +268,30 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
         assertThatCode(action2::run).doesNotThrowAnyException();
 
         String withoutOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s')"
-                                : "CALL sys.remove_orphan_files('%s.%s')",
-                        database,
-                        tableName);
+                String.format("CALL sys.remove_orphan_files('%s.%s')", 
database, tableName);
         CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
         
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
 
         String withLocalMode =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'local')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'local')",
-                        database,
-                        tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true, 5, 'local')",
+                        database, tableName);
         ImmutableList<Row> actualLocalRunDeleteFile =
                 ImmutableList.copyOf(executeSQL(withLocalMode));
         assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));
 
         String withDistributedMode =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'distributed')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'distributed')",
-                        database,
-                        tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true, 5, 'distributed')",
+                        database, tableName);
         ImmutableList<Row> actualDistributedRunDeleteFile =
                 ImmutableList.copyOf(executeSQL(withDistributedMode));
         assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));
 
         String withInvalidMode =
                 String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'unknown')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'unknown')",
-                        database,
-                        tableName);
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true, 5, 'unknown')",
+                        database, tableName);
         assertThatCode(() -> executeSQL(withInvalidMode))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessageContaining("Unknown mode");
diff --git 
a/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
new file mode 100644
index 000000000..e1be410b8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.action.RemoveOrphanFilesAction;
+import org.apache.paimon.flink.action.RemoveOrphanFilesActionITCaseBase;
+
+/** IT cases base for {@link RemoveOrphanFilesAction} in Flink 1.19. */
+public class RemoveOrphanFilesActionITCase extends 
RemoveOrphanFilesActionITCaseBase {}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index f50414620..61bebca24 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -279,7 +279,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                 });
 
         if (deletedInLocal.get() != 0) {
-            deleted = deleted.union(env.fromData(deletedInLocal.get()));
+            deleted = deleted.union(env.fromElements(deletedInLocal.get()));
         }
         return deleted;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 938a8ce1b..a92e529aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -18,324 +18,5 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.sink.StreamWriteBuilder;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-
-/** IT cases for {@link RemoveOrphanFilesAction}. */
-public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
-
-    private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
-    private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";
-
-    private FileStoreTable createTableAndWriteData(String tableName) throws 
Exception {
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
-                        new String[] {"k", "v"});
-
-        FileStoreTable table =
-                createFileStoreTable(
-                        tableName,
-                        rowType,
-                        Collections.emptyList(),
-                        Collections.singletonList("k"),
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-
-        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
-        write = writeBuilder.newWrite();
-        commit = writeBuilder.newCommit();
-
-        writeData(rowData(1L, BinaryString.fromString("Hi")));
-
-        Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
-        Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
-
-        FileIO fileIO = table.fileIO();
-        fileIO.writeFile(orphanFile1, "a", true);
-        Thread.sleep(2000);
-        fileIO.writeFile(orphanFile2, "b", true);
-
-        return table;
-    }
-
-    private Path getOrphanFilePath(FileStoreTable table, String orphanFile) {
-        return new Path(table.location(), orphanFile);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testRunWithoutException(boolean isNamedArgument) throws 
Exception {
-        createTableAndWriteData(tableName);
-
-        List<String> args =
-                new ArrayList<>(
-                        Arrays.asList(
-                                "remove_orphan_files",
-                                "--warehouse",
-                                warehouse,
-                                "--database",
-                                database,
-                                "--table",
-                                tableName));
-        RemoveOrphanFilesAction action1 = 
createAction(RemoveOrphanFilesAction.class, args);
-        assertThatCode(action1::run).doesNotThrowAnyException();
-
-        args.add("--older_than");
-        args.add("2023-12-31 23:59:59");
-        RemoveOrphanFilesAction action2 = 
createAction(RemoveOrphanFilesAction.class, args);
-        assertThatCode(action2::run).doesNotThrowAnyException();
-
-        String withoutOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s')"
-                                : "CALL sys.remove_orphan_files('%s.%s')",
-                        database,
-                        tableName);
-        CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
-        
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
-
-        String withDryRun =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true)",
-                        database,
-                        tableName);
-        ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
-        assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));
-
-        String withOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
-                        database,
-                        tableName);
-        ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
-
-        assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) 
throws Exception {
-        createTableAndWriteData("tableName1");
-        createTableAndWriteData("tableName2");
-
-        List<String> args =
-                new ArrayList<>(
-                        Arrays.asList(
-                                "remove_orphan_files",
-                                "--warehouse",
-                                warehouse,
-                                "--database",
-                                database,
-                                "--table",
-                                "*"));
-        RemoveOrphanFilesAction action1 = 
createAction(RemoveOrphanFilesAction.class, args);
-        assertThatCode(action1::run).doesNotThrowAnyException();
-
-        args.add("--older_than");
-        args.add("2023-12-31 23:59:59");
-        RemoveOrphanFilesAction action2 = 
createAction(RemoveOrphanFilesAction.class, args);
-        assertThatCode(action2::run).doesNotThrowAnyException();
-
-        args.add("--parallelism");
-        args.add("5");
-        RemoveOrphanFilesAction action3 = 
createAction(RemoveOrphanFilesAction.class, args);
-        assertThatCode(action3::run).doesNotThrowAnyException();
-
-        String withoutOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s')"
-                                : "CALL sys.remove_orphan_files('%s.%s')",
-                        database,
-                        "*");
-        CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
-        
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
-
-        String withParallelism =
-                String.format("CALL 
sys.remove_orphan_files('%s.%s','',true,5)", database, "*");
-        CloseableIterator<Row> withParallelismCollect = 
executeSQL(withParallelism);
-        
assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0"));
-
-        String withDryRun =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true)",
-                        database,
-                        "*");
-        ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(executeSQL(withDryRun));
-        assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));
-
-        String withOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
-                        database,
-                        "*");
-        ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
-
-        assertThat(actualDeleteFile).containsOnly(Row.of("4"));
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testCleanWithBranch(boolean isNamedArgument) throws Exception {
-        // create main branch
-        FileStoreTable table = createTableAndWriteData(tableName);
-
-        // create first branch and write some data
-        table.createBranch("br");
-        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location(), "br");
-        TableSchema branchSchema =
-                schemaManager.commitChanges(SchemaChange.addColumn("v2", 
DataTypes.INT()));
-        Options branchOptions = new Options(branchSchema.options());
-        branchOptions.set(CoreOptions.BRANCH, "br");
-        branchSchema = branchSchema.copy(branchOptions.toMap());
-        FileStoreTable branchTable =
-                FileStoreTableFactory.create(table.fileIO(), table.location(), 
branchSchema);
-
-        String commitUser = UUID.randomUUID().toString();
-        StreamTableWrite write = branchTable.newWrite(commitUser);
-        StreamTableCommit commit = branchTable.newCommit(commitUser);
-        write.write(GenericRow.of(2L, BinaryString.fromString("Hello"), 20));
-        commit.commit(1, write.prepareCommit(false, 1));
-        write.close();
-        commit.close();
-
-        // create orphan file in snapshot directory of first branch
-        Path orphanFile3 = new Path(table.location(), 
"branch/branch-br/snapshot/orphan_file3");
-        branchTable.fileIO().writeFile(orphanFile3, "x", true);
-
-        // create second branch, which is empty
-        table.createBranch("br2");
-
-        // create orphan file in snapshot directory of second branch
-        Path orphanFile4 = new Path(table.location(), 
"branch/branch-br2/snapshot/orphan_file4");
-        branchTable.fileIO().writeFile(orphanFile4, "y", true);
-
-        if (ThreadLocalRandom.current().nextBoolean()) {
-            executeSQL(
-                    String.format(
-                            "ALTER TABLE `%s`.`%s` SET ('%s' = 'br')",
-                            database, tableName, SCAN_FALLBACK_BRANCH.key()),
-                    false,
-                    true);
-        }
-        String procedure =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59')",
-                        database,
-                        "*");
-        ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(procedure));
-        assertThat(actualDeleteFile).containsOnly(Row.of("4"));
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testRunWithMode(boolean isNamedArgument) throws Exception {
-        createTableAndWriteData(tableName);
-
-        List<String> args =
-                new ArrayList<>(
-                        Arrays.asList(
-                                "remove_orphan_files",
-                                "--warehouse",
-                                warehouse,
-                                "--database",
-                                database,
-                                "--table",
-                                tableName));
-        RemoveOrphanFilesAction action1 = 
createAction(RemoveOrphanFilesAction.class, args);
-        assertThatCode(action1::run).doesNotThrowAnyException();
-
-        args.add("--older_than");
-        args.add("2023-12-31 23:59:59");
-        RemoveOrphanFilesAction action2 = 
createAction(RemoveOrphanFilesAction.class, args);
-        assertThatCode(action2::run).doesNotThrowAnyException();
-
-        String withoutOlderThan =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s')"
-                                : "CALL sys.remove_orphan_files('%s.%s')",
-                        database,
-                        tableName);
-        CloseableIterator<Row> withoutOlderThanCollect = 
executeSQL(withoutOlderThan);
-        
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
-
-        String withLocalMode =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'local')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'local')",
-                        database,
-                        tableName);
-        ImmutableList<Row> actualLocalRunDeleteFile =
-                ImmutableList.copyOf(executeSQL(withLocalMode));
-        assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));
-
-        String withDistributedMode =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'distributed')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'distributed')",
-                        database,
-                        tableName);
-        ImmutableList<Row> actualDistributedRunDeleteFile =
-                ImmutableList.copyOf(executeSQL(withDistributedMode));
-        assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));
-
-        String withInvalidMode =
-                String.format(
-                        isNamedArgument
-                                ? "CALL sys.remove_orphan_files(`table` => 
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 
5, mode => 'unknown')"
-                                : "CALL sys.remove_orphan_files('%s.%s', 
'2999-12-31 23:59:59', true, 5, 'unknown')",
-                        database,
-                        tableName);
-        assertThatCode(() -> executeSQL(withInvalidMode))
-                .isInstanceOf(RuntimeException.class)
-                .hasMessageContaining("Unknown mode");
-    }
-}
+/** IT cases base for {@link RemoveOrphanFilesAction} in Flink Common. */
+public class RemoveOrphanFilesActionITCase extends 
RemoveOrphanFilesActionITCaseBase {}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
similarity index 99%
copy from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
copy to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 938a8ce1b..5f874a5a7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -54,8 +54,8 @@ import static 
org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
-/** IT cases for {@link RemoveOrphanFilesAction}. */
-public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
+/** IT cases base for {@link RemoveOrphanFilesAction}. */
+public abstract class RemoveOrphanFilesActionITCaseBase extends 
ActionITCaseBase {
 
     private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
     private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";


Reply via email to