This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8d57d3d90a [core] add deletedFileTotalSizeInBytes in result of
OrphanFilesClean (#4545)
8d57d3d90a is described below
commit 8d57d3d90a945c8705d3acda3e7bf0ef2cb157ef
Author: wangwj <[email protected]>
AuthorDate: Tue Nov 26 13:02:01 2024 +0800
[core] add deletedFileTotalSizeInBytes in result of OrphanFilesClean (#4545)
---
.../paimon/operation/CleanOrphanFilesResult.java | 54 ++++++++++++
.../paimon/operation/LocalOrphanFilesClean.java | 64 +++++++++-----
.../apache/paimon/operation/OrphanFilesClean.java | 38 +++++++--
.../org/apache/paimon/utils/SnapshotManager.java | 10 +--
.../operation/LocalOrphanFilesCleanTest.java | 18 ++--
.../procedure/RemoveOrphanFilesProcedure.java | 13 ++-
.../flink/RemoveOrphanFilesActionITCase.java | 2 +-
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 98 +++++++++++++++-------
.../procedure/RemoveOrphanFilesProcedure.java | 12 ++-
.../action/RemoveOrphanFilesActionITCaseBase.java | 2 +-
.../procedure/RemoveOrphanFilesProcedure.java | 18 ++--
.../spark/orphan/SparkOrphanFilesClean.scala | 64 ++++++++------
.../procedure/RemoveOrphanFilesProcedureTest.scala | 30 +++----
13 files changed, 291 insertions(+), 132 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java
b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java
new file mode 100644
index 0000000000..5a3bc67f9c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.fs.Path;
+
+import java.util.List;
+
+/** The result of OrphanFilesClean. */
+public class CleanOrphanFilesResult {
+
+ private List<Path> deletedFilesPath;
+ private final long deletedFileCount;
+ private final long deletedFileTotalLenInBytes;
+
+ public CleanOrphanFilesResult(long deletedFileCount, long
deletedFileTotalLenInBytes) {
+ this.deletedFileCount = deletedFileCount;
+ this.deletedFileTotalLenInBytes = deletedFileTotalLenInBytes;
+ }
+
+ public CleanOrphanFilesResult(
+ List<Path> deletedFilesPath, long deletedFileCount, long
deletedFileTotalLenInBytes) {
+ this(deletedFileCount, deletedFileTotalLenInBytes);
+ this.deletedFilesPath = deletedFilesPath;
+ }
+
+ public long getDeletedFileCount() {
+ return deletedFileCount;
+ }
+
+ public long getDeletedFileTotalLenInBytes() {
+ return deletedFileTotalLenInBytes;
+ }
+
+ public List<Path> getDeletedFilesPath() {
+ return deletedFilesPath;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index a5eea6d650..511c5fc7fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -21,12 +21,12 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableConsumer;
import javax.annotation.Nullable;
@@ -47,6 +47,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -68,6 +69,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {
private final List<Path> deleteFiles;
+ private final AtomicLong deletedFilesLenInBytes = new AtomicLong(0);
+
private Set<String> candidateDeletes;
public LocalOrphanFilesClean(FileStoreTable table) {
@@ -87,16 +90,18 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
table.coreOptions().deleteFileThreadNum(),
"ORPHAN_FILES_CLEAN");
}
- public List<Path> clean() throws IOException, ExecutionException,
InterruptedException {
+ public CleanOrphanFilesResult clean()
+ throws IOException, ExecutionException, InterruptedException {
List<String> branches = validBranches();
// specially handle to clear snapshot dir
- cleanSnapshotDir(branches, deleteFiles::add);
+ cleanSnapshotDir(branches, deleteFiles::add,
deletedFilesLenInBytes::addAndGet);
// delete candidate files
- Map<String, Path> candidates = getCandidateDeletingFiles();
+ Map<String, Pair<Path, Long>> candidates = getCandidateDeletingFiles();
if (candidates.isEmpty()) {
- return deleteFiles;
+ return new CleanOrphanFilesResult(
+ deleteFiles, deleteFiles.size(),
deletedFilesLenInBytes.get());
}
candidateDeletes = new HashSet<>(candidates.keySet());
@@ -108,12 +113,22 @@ public class LocalOrphanFilesClean extends
OrphanFilesClean {
// delete unused files
candidateDeletes.removeAll(usedFiles);
- candidateDeletes.stream().map(candidates::get).forEach(fileCleaner);
+ candidateDeletes.stream()
+ .map(candidates::get)
+ .forEach(
+ deleteFileInfo -> {
+
deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight());
+ fileCleaner.accept(deleteFileInfo.getLeft());
+ });
deleteFiles.addAll(
-
candidateDeletes.stream().map(candidates::get).collect(Collectors.toList()));
+ candidateDeletes.stream()
+ .map(candidates::get)
+ .map(Pair::getLeft)
+ .collect(Collectors.toList()));
candidateDeletes.clear();
- return deleteFiles;
+ return new CleanOrphanFilesResult(
+ deleteFiles, deleteFiles.size(), deletedFilesLenInBytes.get());
}
private void collectWithoutDataFile(
@@ -172,19 +187,20 @@ public class LocalOrphanFilesClean extends
OrphanFilesClean {
* Get all the candidate deleting files in the specified directories and
filter them by
* olderThanMillis.
*/
- private Map<String, Path> getCandidateDeletingFiles() {
+ private Map<String, Pair<Path, Long>> getCandidateDeletingFiles() {
List<Path> fileDirs = listPaimonFileDirs();
- Function<Path, List<Path>> processor =
+ Function<Path, List<Pair<Path, Long>>> processor =
path ->
tryBestListingDirs(path).stream()
.filter(this::oldEnough)
- .map(FileStatus::getPath)
+ .map(status -> Pair.of(status.getPath(),
status.getLen()))
.collect(Collectors.toList());
- Iterator<Path> allPaths = randomlyExecuteSequentialReturn(executor,
processor, fileDirs);
- Map<String, Path> result = new HashMap<>();
- while (allPaths.hasNext()) {
- Path next = allPaths.next();
- result.put(next.getName(), next);
+ Iterator<Pair<Path, Long>> allFilesInfo =
+ randomlyExecuteSequentialReturn(executor, processor, fileDirs);
+ Map<String, Pair<Path, Long>> result = new HashMap<>();
+ while (allFilesInfo.hasNext()) {
+ Pair<Path, Long> fileInfo = allFilesInfo.next();
+ result.put(fileInfo.getLeft().getName(), fileInfo);
}
return result;
}
@@ -197,7 +213,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism)
throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
- List<LocalOrphanFilesClean> orphanFilesCleans = new ArrayList<>();
List<String> tableNames = Collections.singletonList(tableName);
if (tableName == null || "*".equals(tableName)) {
tableNames = catalog.listTables(databaseName);
@@ -214,6 +229,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
}
};
+ List<LocalOrphanFilesClean> orphanFilesCleans = new
ArrayList<>(tableNames.size());
for (String t : tableNames) {
Identifier identifier = new Identifier(databaseName, t);
Table table = catalog.getTable(identifier).copy(dynamicOptions);
@@ -230,7 +246,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
return orphanFilesCleans;
}
- public static long executeDatabaseOrphanFiles(
+ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
Catalog catalog,
String databaseName,
@Nullable String tableName,
@@ -249,15 +265,17 @@ public class LocalOrphanFilesClean extends
OrphanFilesClean {
ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- List<Future<List<Path>>> tasks = new ArrayList<>();
+ List<Future<CleanOrphanFilesResult>> tasks = new
ArrayList<>(tableCleans.size());
for (LocalOrphanFilesClean clean : tableCleans) {
tasks.add(executorService.submit(clean::clean));
}
- List<Path> cleanOrphanFiles = new ArrayList<>();
- for (Future<List<Path>> task : tasks) {
+ long deletedFileCount = 0;
+ long deletedFileTotalLenInBytes = 0;
+ for (Future<CleanOrphanFilesResult> task : tasks) {
try {
- cleanOrphanFiles.addAll(task.get());
+ deletedFileCount += task.get().getDeletedFileCount();
+ deletedFileTotalLenInBytes +=
task.get().getDeletedFileTotalLenInBytes();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
@@ -267,6 +285,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
}
executorService.shutdownNow();
- return cleanOrphanFiles.size();
+ return new CleanOrphanFilesResult(deletedFileCount,
deletedFileTotalLenInBytes);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 869100d9cf..274cdd52fe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -120,23 +120,47 @@ public abstract class OrphanFilesClean implements
Serializable {
return branches;
}
- protected void cleanSnapshotDir(List<String> branches, Consumer<Path>
deletedFileConsumer) {
+ protected void cleanSnapshotDir(
+ List<String> branches,
+ Consumer<Path> deletedFilesConsumer,
+ Consumer<Long> deletedFilesLenInBytesConsumer) {
for (String branch : branches) {
FileStoreTable branchTable = table.switchToBranch(branch);
SnapshotManager snapshotManager = branchTable.snapshotManager();
// specially handle the snapshot directory
- List<Path> nonSnapshotFiles =
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
- nonSnapshotFiles.forEach(fileCleaner);
- nonSnapshotFiles.forEach(deletedFileConsumer);
+ List<Pair<Path, Long>> nonSnapshotFiles =
+ snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
+ nonSnapshotFiles.forEach(
+ nonSnapshotFile ->
+ cleanFile(
+ nonSnapshotFile,
+ deletedFilesConsumer,
+ deletedFilesLenInBytesConsumer));
// specially handle the changelog directory
- List<Path> nonChangelogFiles =
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
- nonChangelogFiles.forEach(fileCleaner);
- nonChangelogFiles.forEach(deletedFileConsumer);
+ List<Pair<Path, Long>> nonChangelogFiles =
+ snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
+ nonChangelogFiles.forEach(
+ nonChangelogFile ->
+ cleanFile(
+ nonChangelogFile,
+ deletedFilesConsumer,
+ deletedFilesLenInBytesConsumer));
}
}
+ private void cleanFile(
+ Pair<Path, Long> deleteFileInfo,
+ Consumer<Path> deletedFilesConsumer,
+ Consumer<Long> deletedFilesLenInBytesConsumer) {
+ Path filePath = deleteFileInfo.getLeft();
+ Long fileSize = deleteFileInfo.getRight();
+ deletedFilesConsumer.accept(filePath);
+ deletedFilesLenInBytesConsumer.accept(fileSize);
+ fileCleaner.accept(filePath);
+ }
+
protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws
IOException {
FileStoreTable branchTable = table.switchToBranch(branch);
SnapshotManager snapshotManager = branchTable.snapshotManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 9a120042ea..cbe33ffaf4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -563,15 +563,15 @@ public class SnapshotManager implements Serializable {
* Try to get non snapshot files. If any error occurred, just ignore it
and return an empty
* result.
*/
- public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter) {
+ public List<Pair<Path, Long>> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter) {
return listPathWithFilter(snapshotDirectory(), fileStatusFilter,
nonSnapshotFileFilter());
}
- public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus>
fileStatusFilter) {
+ public List<Pair<Path, Long>>
tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(changelogDirectory(), fileStatusFilter,
nonChangelogFileFilter());
}
- private List<Path> listPathWithFilter(
+ private List<Pair<Path, Long>> listPathWithFilter(
Path directory, Predicate<FileStatus> fileStatusFilter,
Predicate<Path> fileFilter) {
try {
FileStatus[] statuses = fileIO.listStatus(directory);
@@ -581,8 +581,8 @@ public class SnapshotManager implements Serializable {
return Arrays.stream(statuses)
.filter(fileStatusFilter)
- .map(FileStatus::getPath)
- .filter(fileFilter)
+ .filter(status -> fileFilter.test(status.getPath()))
+ .map(status -> Pair.of(status.getPath(), status.getLen()))
.collect(Collectors.toList());
} catch (IOException ignored) {
return Collections.emptyList();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
index fdc68b34ab..5139dd4495 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
@@ -165,22 +165,20 @@ public class LocalOrphanFilesCleanTest {
// randomly delete tags
List<String> deleteTags = Collections.emptyList();
- if (!allTags.isEmpty()) {
- deleteTags = randomlyPick(allTags);
- for (String tagName : deleteTags) {
- table.deleteTag(tagName);
- }
+ deleteTags = randomlyPick(allTags);
+ for (String tagName : deleteTags) {
+ table.deleteTag(tagName);
}
// first check, nothing will be deleted because the default olderThan
interval is 1 day
LocalOrphanFilesClean orphanFilesClean = new
LocalOrphanFilesClean(table);
- assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
+
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);
// second check
orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(2));
- List<Path> deleted = orphanFilesClean.clean();
+ List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
try {
validate(deleted, snapshotData, new HashMap<>());
} catch (Throwable t) {
@@ -363,13 +361,13 @@ public class LocalOrphanFilesCleanTest {
// first check, nothing will be deleted because the default olderThan
interval is 1 day
LocalOrphanFilesClean orphanFilesClean = new
LocalOrphanFilesClean(table);
- assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
+
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);
// second check
orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(2));
- List<Path> deleted = orphanFilesClean.clean();
+ List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
validate(deleted, snapshotData, changelogData);
}
@@ -399,7 +397,7 @@ public class LocalOrphanFilesCleanTest {
LocalOrphanFilesClean orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(2));
- assertThat(orphanFilesClean.clean().size()).isGreaterThan(0);
+
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0);
}
private void writeData(
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 7695c510b1..b4a3a6b359 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -86,11 +87,12 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
if (mode == null) {
mode = "DISTRIBUTED";
}
- long deletedFiles;
+
+ CleanOrphanFilesResult cleanOrphanFilesResult;
try {
switch (mode.toUpperCase(Locale.ROOT)) {
case "DISTRIBUTED":
- deletedFiles =
+ cleanOrphanFilesResult =
FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
procedureContext.getExecutionEnvironment(),
catalog,
@@ -101,7 +103,7 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
tableName);
break;
case "LOCAL":
- deletedFiles =
+ cleanOrphanFilesResult =
LocalOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
databaseName,
@@ -116,7 +118,10 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
+ mode
+ ". Only 'DISTRIBUTED' and 'LOCAL' are
supported.");
}
- return new String[] {String.valueOf(deletedFiles)};
+ return new String[] {
+ String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()),
+
String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalLenInBytes())
+ };
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
index 46b62b6bf3..a168c3785c 100644
---
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
@@ -137,7 +137,7 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
database, tableName);
ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(withOlderThan));
- assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
+ assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"),
Row.of("2"));
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 61bebca24a..23bbbc9b60 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -27,12 +27,15 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableConsumer;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -61,7 +64,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -81,7 +83,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean {
}
@Nullable
- public DataStream<Long> doOrphanClean(StreamExecutionEnvironment env) {
+ public DataStream<CleanOrphanFilesResult>
doOrphanClean(StreamExecutionEnvironment env) {
Configuration flinkConf = new Configuration();
flinkConf.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
flinkConf.set(ExecutionOptions.SORT_INPUTS, false);
@@ -97,8 +99,12 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean {
// snapshot and changelog files are the root of everything, so they
are handled specially
// here, and subsequently, we will not count their orphan files.
- AtomicLong deletedInLocal = new AtomicLong(0);
- cleanSnapshotDir(branches, p -> deletedInLocal.incrementAndGet());
+ AtomicLong deletedFilesCountInLocal = new AtomicLong(0);
+ AtomicLong deletedFilesLenInBytesInLocal = new AtomicLong(0);
+ cleanSnapshotDir(
+ branches,
+ path -> deletedFilesCountInLocal.incrementAndGet(),
+ deletedFilesLenInBytesInLocal::addAndGet);
// branch and manifest file
final OutputTag<Tuple2<String, String>> manifestOutputTag =
@@ -203,36 +209,45 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
.map(Path::toUri)
.map(Object::toString)
.collect(Collectors.toList());
- DataStream<String> candidates =
+ DataStream<Pair<String, Long>> candidates =
env.fromCollection(fileDirs)
.process(
- new ProcessFunction<String, String>() {
+ new ProcessFunction<String, Pair<String,
Long>>() {
@Override
public void processElement(
String dir,
- ProcessFunction<String,
String>.Context ctx,
- Collector<String> out) {
+ ProcessFunction<String,
Pair<String, Long>>.Context ctx,
+ Collector<Pair<String, Long>> out)
{
for (FileStatus fileStatus :
tryBestListingDirs(new
Path(dir))) {
if (oldEnough(fileStatus)) {
out.collect(
-
fileStatus.getPath().toUri().toString());
+ Pair.of(
+ fileStatus
+
.getPath()
+
.toUri()
+
.toString(),
+
fileStatus.getLen()));
}
}
}
});
- DataStream<Long> deleted =
+ DataStream<CleanOrphanFilesResult> deleted =
usedFiles
.keyBy(f -> f)
- .connect(candidates.keyBy(path -> new
Path(path).getName()))
+ .connect(
+ candidates.keyBy(
+ pathAndSize -> new
Path(pathAndSize.getKey()).getName()))
.transform(
"files_join",
- LONG_TYPE_INFO,
- new BoundedTwoInputOperator<String, String,
Long>() {
+
TypeInformation.of(CleanOrphanFilesResult.class),
+ new BoundedTwoInputOperator<
+ String, Pair<String, Long>,
CleanOrphanFilesResult>() {
private boolean buildEnd;
- private long emitted;
+ private long emittedFilesCount;
+ private long emittedFilesLen;
private final Set<String> used = new
HashSet<>();
@@ -254,8 +269,15 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
case 2:
checkState(buildEnd, "Should
build ended.");
LOG.info("Finish probe
phase.");
- LOG.info("Clean files: {}",
emitted);
- output.collect(new
StreamRecord<>(emitted));
+ LOG.info(
+ "Clean files count :
{}",
+ emittedFilesCount);
+ LOG.info("Clean files size :
{}", emittedFilesLen);
+ output.collect(
+ new StreamRecord<>(
+ new
CleanOrphanFilesResult(
+
emittedFilesCount,
+
emittedFilesLen)));
break;
}
}
@@ -266,25 +288,34 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
}
@Override
- public void
processElement2(StreamRecord<String> element) {
+ public void processElement2(
+ StreamRecord<Pair<String, Long>>
element) {
checkState(buildEnd, "Should build
ended.");
- String value = element.getValue();
+ Pair<String, Long> fileInfo =
element.getValue();
+ String value = fileInfo.getLeft();
Path path = new Path(value);
if (!used.contains(path.getName())) {
+ emittedFilesCount++;
+ emittedFilesLen +=
fileInfo.getRight();
fileCleaner.accept(path);
LOG.info("Dry clean: {}", path);
- emitted++;
}
}
});
- if (deletedInLocal.get() != 0) {
- deleted = deleted.union(env.fromElements(deletedInLocal.get()));
+ if (deletedFilesCountInLocal.get() != 0 ||
deletedFilesLenInBytesInLocal.get() != 0) {
+ deleted =
+ deleted.union(
+ env.fromElements(
+ new CleanOrphanFilesResult(
+ deletedFilesCountInLocal.get(),
+
deletedFilesLenInBytesInLocal.get())));
}
+
return deleted;
}
- public static long executeDatabaseOrphanFiles(
+ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
StreamExecutionEnvironment env,
Catalog catalog,
long olderThanMillis,
@@ -293,12 +324,13 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
String databaseName,
@Nullable String tableName)
throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
- List<DataStream<Long>> orphanFilesCleans = new ArrayList<>();
List<String> tableNames = Collections.singletonList(tableName);
if (tableName == null || "*".equals(tableName)) {
tableNames = catalog.listTables(databaseName);
}
+ List<DataStream<CleanOrphanFilesResult>> orphanFilesCleans =
+ new ArrayList<>(tableNames.size());
for (String t : tableNames) {
Identifier identifier = new Identifier(databaseName, t);
Table table = catalog.getTable(identifier);
@@ -307,7 +339,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
"Only FileStoreTable supports remove-orphan-files action.
The table type is '%s'.",
table.getClass().getName());
- DataStream<Long> clean =
+ DataStream<CleanOrphanFilesResult> clean =
new FlinkOrphanFilesClean(
(FileStoreTable) table,
olderThanMillis,
@@ -319,8 +351,8 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
}
}
- DataStream<Long> result = null;
- for (DataStream<Long> clean : orphanFilesCleans) {
+ DataStream<CleanOrphanFilesResult> result = null;
+ for (DataStream<CleanOrphanFilesResult> clean : orphanFilesCleans) {
if (result == null) {
result = clean;
} else {
@@ -331,20 +363,24 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
return sum(result);
}
- private static long sum(DataStream<Long> deleted) {
- long deleteCount = 0;
+ private static CleanOrphanFilesResult
sum(DataStream<CleanOrphanFilesResult> deleted) {
+ long deletedFilesCount = 0;
+ long deletedFilesLenInBytes = 0;
if (deleted != null) {
try {
- CloseableIterator<Long> iterator =
+ CloseableIterator<CleanOrphanFilesResult> iterator =
deleted.global().executeAndCollect("OrphanFilesClean");
while (iterator.hasNext()) {
- deleteCount += iterator.next();
+ CleanOrphanFilesResult cleanOrphanFilesResult =
iterator.next();
+ deletedFilesCount +=
cleanOrphanFilesResult.getDeletedFileCount();
+ deletedFilesLenInBytes +=
+
cleanOrphanFilesResult.getDeletedFileTotalLenInBytes();
}
iterator.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- return deleteCount;
+ return new CleanOrphanFilesResult(deletedFilesCount,
deletedFilesLenInBytes);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 10ad878e0c..4cd1b3e003 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.flink.table.annotation.ArgumentHint;
@@ -75,11 +76,11 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
if (mode == null) {
mode = "DISTRIBUTED";
}
- long deletedFiles;
+ CleanOrphanFilesResult cleanOrphanFilesResult;
try {
switch (mode.toUpperCase(Locale.ROOT)) {
case "DISTRIBUTED":
- deletedFiles =
+ cleanOrphanFilesResult =
FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
procedureContext.getExecutionEnvironment(),
catalog,
@@ -90,7 +91,7 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase
{
tableName);
break;
case "LOCAL":
- deletedFiles =
+ cleanOrphanFilesResult =
LocalOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
databaseName,
@@ -105,7 +106,10 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
+ mode
+ ". Only 'DISTRIBUTED' and 'LOCAL' are
supported.");
}
- return new String[] {String.valueOf(deletedFiles)};
+ return new String[] {
+ String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()),
+
String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalLenInBytes())
+ };
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 5f874a5a7f..77f3be2f0c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -148,7 +148,7 @@ public abstract class RemoveOrphanFilesActionITCaseBase
extends ActionITCaseBase
tableName);
ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(withOlderThan));
- assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
+ assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"),
Row.of("2"));
}
@ParameterizedTest
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index 293e84ca14..a929641106 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.spark.catalog.WithPaimonCatalog;
@@ -66,7 +67,9 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure
{
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("result", LongType, true,
Metadata.empty())
+ new StructField("deletedFileCount", LongType, true,
Metadata.empty()),
+ new StructField(
+ "deletedFileTotalLenInBytes", LongType, true,
Metadata.empty())
});
private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) {
@@ -104,11 +107,11 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
String mode = args.isNullAt(4) ? "DISTRIBUTED" : args.getString(4);
- long deletedFiles;
+ CleanOrphanFilesResult cleanOrphanFilesResult;
try {
switch (mode.toUpperCase(Locale.ROOT)) {
case "LOCAL":
- deletedFiles =
+ cleanOrphanFilesResult =
LocalOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
identifier.getDatabaseName(),
@@ -120,7 +123,7 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
args.isNullAt(3) ? null : args.getInt(3));
break;
case "DISTRIBUTED":
- deletedFiles =
+ cleanOrphanFilesResult =
SparkOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
identifier.getDatabaseName(),
@@ -137,7 +140,12 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
+ mode
+ ". Only 'DISTRIBUTED' and 'LOCAL' are
supported.");
}
- return new InternalRow[] {newInternalRow(deletedFiles)};
+
+ return new InternalRow[] {
+ newInternalRow(
+ cleanOrphanFilesResult.getDeletedFileCount(),
+ cleanOrphanFilesResult.getDeletedFileTotalLenInBytes())
+ };
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
index 488d70e349..fca0493ede 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
@@ -22,15 +22,14 @@ import org.apache.paimon.{utils, Snapshot}
import org.apache.paimon.catalog.{Catalog, Identifier}
import org.apache.paimon.fs.Path
import org.apache.paimon.manifest.{ManifestEntry, ManifestFile}
-import org.apache.paimon.operation.OrphanFilesClean
+import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean}
import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.utils.SerializableConsumer
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.{functions, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.functions.sum
import java.util
import java.util.Collections
@@ -50,14 +49,18 @@ case class SparkOrphanFilesClean(
with SQLConfHelper
with Logging {
- def doOrphanClean(): (Dataset[Long], Dataset[BranchAndManifestFile]) = {
+ def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile])
= {
import spark.implicits._
val branches = validBranches()
- val deletedInLocal = new AtomicLong(0)
+ val deletedFilesCountInLocal = new AtomicLong(0)
+ val deletedFilesLenInBytesInLocal = new AtomicLong(0)
// snapshot and changelog files are the root of everything, so they are
handled specially
// here, and subsequently, we will not count their orphan files.
- cleanSnapshotDir(branches, (_: Path) => deletedInLocal.incrementAndGet)
+ cleanSnapshotDir(
+ branches,
+ (_: Path) => deletedFilesCountInLocal.incrementAndGet,
+ size => deletedFilesLenInBytesInLocal.addAndGet(size))
val maxBranchParallelism = Math.min(branches.size(), parallelism)
// find snapshots using branch and find manifests(manifest, index,
statistics) using snapshot
@@ -121,10 +124,10 @@ case class SparkOrphanFilesClean(
.flatMap {
dir =>
tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
- file => (file.getPath.getName, file.getPath.toUri.toString)
+ file => (file.getPath.getName, file.getPath.toUri.toString,
file.getLen)
}
}
- .toDF("name", "path")
+ .toDF("name", "path", "len")
.repartition(parallelism)
// use left anti to filter files which is not used
@@ -132,21 +135,30 @@ case class SparkOrphanFilesClean(
.join(usedFiles, $"name" === $"used_name", "left_anti")
.mapPartitions {
it =>
- var deleted = 0L
+ var deletedFilesCount = 0L
+ var deletedFilesLenInBytes = 0L
+
while (it.hasNext) {
- val pathToClean = it.next().getString(1)
- specifiedFileCleaner.accept(new Path(pathToClean))
+ val fileInfo = it.next();
+ val pathToClean = fileInfo.getString(1)
+ val deletedPath = new Path(pathToClean)
+ deletedFilesLenInBytes += fileInfo.getLong(2)
+ specifiedFileCleaner.accept(deletedPath)
logInfo(s"Cleaned file: $pathToClean")
- deleted += 1
+ deletedFilesCount += 1
}
- logInfo(s"Total cleaned files: $deleted");
- Iterator.single(deleted)
+ logInfo(
+ s"Total cleaned files: $deletedFilesCount, Total cleaned files len
: $deletedFilesLenInBytes")
+ Iterator.single((deletedFilesCount, deletedFilesLenInBytes))
+ }
+ val finalDeletedDataset =
+ if (deletedFilesCountInLocal.get() != 0 ||
deletedFilesLenInBytesInLocal.get() != 0) {
+ deleted.union(
+ spark.createDataset(
+ Seq((deletedFilesCountInLocal.get(),
deletedFilesLenInBytesInLocal.get()))))
+ } else {
+ deleted
}
- val finalDeletedDataset = if (deletedInLocal.get() != 0) {
- deleted.union(spark.createDataset(Seq(deletedInLocal.get())))
- } else {
- deleted
- }
(finalDeletedDataset, usedManifestFiles)
}
@@ -169,7 +181,7 @@ object SparkOrphanFilesClean extends SQLConfHelper {
tableName: String,
olderThanMillis: Long,
fileCleaner: SerializableConsumer[Path],
- parallelismOpt: Integer): Long = {
+ parallelismOpt: Integer): CleanOrphanFilesResult = {
val spark = SparkSession.active
val parallelism = if (parallelismOpt == null) {
Math.max(spark.sparkContext.defaultParallelism,
conf.numShufflePartitions)
@@ -192,7 +204,7 @@ object SparkOrphanFilesClean extends SQLConfHelper {
table.asInstanceOf[FileStoreTable]
}
if (tables.isEmpty) {
- return 0
+ return new CleanOrphanFilesResult(0, 0)
}
val (deleted, waitToRelease) = tables.map {
table =>
@@ -207,15 +219,15 @@ object SparkOrphanFilesClean extends SQLConfHelper {
try {
val result = deleted
.reduce((l, r) => l.union(r))
- .toDF("deleted")
- .agg(sum("deleted"))
+ .toDF("deletedFilesCount", "deletedFilesLenInBytes")
+ .agg(functions.sum("deletedFilesCount"),
functions.sum("deletedFilesLenInBytes"))
.head()
- assert(result.schema.size == 1, result.schema)
+ assert(result.schema.size == 2, result.schema)
if (result.isNullAt(0)) {
// no files can be deleted
- 0
+ new CleanOrphanFilesResult(0, 0)
} else {
- result.getLong(0)
+ new CleanOrphanFilesResult(result.getLong(0), result.getLong(1))
}
} finally {
waitToRelease.foreach(_.unpersist())
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index d9d7381126..3ffe7fba26 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -52,7 +52,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
fileIO.tryToWriteAtomic(orphanFile2, "b")
// by default, no file deleted
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0, 0) :: Nil)
val orphanFile2ModTime =
fileIO.getFileStatus(orphanFile2).getModificationTime
val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -63,7 +63,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than =>
'$older_than1')"),
- Row(1) :: Nil)
+ Row(1, 1) :: Nil)
val older_than2 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -71,9 +71,9 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than =>
'$older_than2')"),
- Row(1) :: Nil)
+ Row(1, 1) :: Nil)
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0, 0) :: Nil)
}
test("Paimon procedure: dry run remove orphan files") {
@@ -97,7 +97,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
fileIO.writeFile(orphanFile2, "b", true)
// by default, no file deleted
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0, 0) :: Nil)
val older_than = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -106,10 +106,10 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(
spark.sql(
s"CALL sys.remove_orphan_files(table => 'T', older_than =>
'$older_than', dry_run => true)"),
- Row(2) :: Nil
+ Row(2, 2) :: Nil
)
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0, 0) :: Nil)
}
test("Paimon procedure: remove database orphan files") {
@@ -146,7 +146,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
fileIO2.tryToWriteAtomic(orphanFile22, "b")
// by default, no file deleted
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"),
Row(0, 0) :: Nil)
val orphanFile12ModTime =
fileIO1.getFileStatus(orphanFile12).getModificationTime
val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -157,7 +157,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than
=> '$older_than1')"),
- Row(2) :: Nil
+ Row(2, 2) :: Nil
)
val older_than2 = DateTimeUtils.formatLocalDateTime(
@@ -166,10 +166,10 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than
=> '$older_than2')"),
- Row(2) :: Nil
+ Row(2, 2) :: Nil
)
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"),
Row(0, 0) :: Nil)
}
test("Paimon procedure: remove orphan files with mode") {
@@ -193,7 +193,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
fileIO.tryToWriteAtomic(orphanFile2, "b")
// by default, no file deleted
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0, 0) :: Nil)
val orphanFile2ModTime =
fileIO.getFileStatus(orphanFile2).getModificationTime
val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -205,7 +205,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(
spark.sql(
s"CALL sys.remove_orphan_files(table => 'T', older_than =>
'$older_than1', mode => 'diSTributed')"),
- Row(1) :: Nil)
+ Row(1, 1) :: Nil)
val older_than2 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -214,9 +214,9 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
checkAnswer(
spark.sql(
s"CALL sys.remove_orphan_files(table => 'T', older_than =>
'$older_than2', mode => 'local')"),
- Row(1) :: Nil)
+ Row(1, 1) :: Nil)
- checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0) :: Nil)
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"),
Row(0, 0) :: Nil)
}
}