This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 626f78f Revert "[HUDI-781] Introduce HoodieTestTable for test preparation (#1871)" 626f78f is described below commit 626f78f6f639cae2d3d57d29e7ef0642cb0be7ee Author: Balaji Varadarajan <balaji.varadara...@robinhood.com> AuthorDate: Mon Aug 10 22:13:02 2020 -0700 Revert "[HUDI-781] Introduce HoodieTestTable for test preparation (#1871)" This reverts commit b2e703d4427abca02b053fa4444cd5058aa256ef. --- .../org/apache/hudi/io/HoodieAppendHandle.java | 1 - .../org/apache/hudi/io/HoodieCreateHandle.java | 1 - .../java/org/apache/hudi/io/HoodieMergeHandle.java | 1 - .../java/org/apache/hudi/io/HoodieWriteHandle.java | 3 +- .../src/main/java/org/apache/hudi/io}/IOType.java | 15 +- .../java/org/apache/hudi/table/MarkerFiles.java | 15 +- .../rollback/MarkerBasedRollbackStrategy.java | 8 +- .../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 2 +- .../java/org/apache/hudi/table/TestCleaner.java | 393 +++++++++++++-------- .../apache/hudi/table/TestConsistencyGuard.java | 28 +- .../org/apache/hudi/table/TestMarkerFiles.java | 10 +- .../table/action/commit/TestUpsertPartitioner.java | 8 +- .../table/action/compact/TestHoodieCompactor.java | 7 +- .../rollback/TestMarkerBasedRollbackStrategy.java | 69 ++-- .../hudi/testutils/HoodieClientTestUtils.java | 99 ++++-- .../hudi/common/testutils/FileCreateUtils.java | 113 ------ .../hudi/common/testutils/HoodieTestTable.java | 232 ------------ .../hudi/common/testutils/HoodieTestUtils.java | 102 +++--- 19 files changed, 467 insertions(+), 642 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7996a77..7a8e5ab 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieDataBlock; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 5a76dc7..705e98d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 8d54065..f0ea284 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 5ea8c38..d148b1b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -24,7 +24,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -34,13 +33,13 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.MarkerFiles; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.MarkerFiles; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java b/hudi-client/src/main/java/org/apache/hudi/io/IOType.java similarity index 57% rename from hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java rename to hudi-client/src/main/java/org/apache/hudi/io/IOType.java index bd29ff0..aa6660e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/IOType.java @@ -7,17 +7,16 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.apache.hudi.common.model; +package org.apache.hudi.io; /** * Types of lower level I/O operations done on each file slice. diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java index 9577cea..8a310fd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -18,27 +18,26 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.io.IOType; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index 2a137b4..40b81a2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -18,10 +18,10 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; @@ -29,21 +29,19 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; - -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; -import scala.Tuple2; - /** * Performs rollback using marker files generated during the write.. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index e9c9e28..4960ff5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -21,13 +21,13 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 1c4988a..51d8a6a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -48,6 +47,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.commit.WriteHelper; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 55fbab2..0376ec3 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -35,7 +35,6 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -44,8 +43,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; @@ -57,16 +56,13 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -486,97 +482,125 @@ public class TestCleaner extends HoodieClientTestBase { * Test HoodieTable.clean() Cleaning by versions logic. */ @Test - public void testKeepLatestFileVersions() throws Exception { + public void testKeepLatestFileVersions() throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String p0 = "2020/01/01"; - String p1 = "2020/01/02"; - // make 1 commit, with 1 file per partition - Map<String, String> partitionAndFileId000 = testTable.addCommit("000").withInserts(p0, p1); + HoodieTestUtils.createCommitFiles(basePath, "000"); + + String file1P0C0 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); + String file1P1C0 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); + metaClient = HoodieTableMetaClient.reload(metaClient); + List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); - assertTrue(testTable.filesExist(partitionAndFileId000, "000")); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", + file1P1C0)); // make next commit, with 1 insert & 1 update per partition - String file1P0C0 = partitionAndFileId000.get(p0); - String file1P1C0 = partitionAndFileId000.get(p1); - Map<String, String> partitionAndFileId001 = testTable.addCommit("001") - .withUpdates(p0, file1P0C0) - .withUpdates(p1, file1P1C0) - .withInserts(p0, p1); + HoodieTestUtils.createCommitFiles(basePath, "001"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + String file2P0C1 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert + String file2P1C1 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update + List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config); assertEquals(1, - getCleanStat(hoodieCleanStatsTwo, p0).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size(), "Must clean 1 file"); assertEquals(1, - getCleanStat(hoodieCleanStatsTwo, p1).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() .size(), "Must clean 1 file"); - String file2P0C1 = partitionAndFileId001.get(p0); - String file2P1C1 = partitionAndFileId001.get(p1); - assertTrue(testTable.fileExists(p0, "001", file2P0C1)); - assertTrue(testTable.fileExists(p1, "001", file2P1C1)); - assertFalse(testTable.fileExists(p0, "000", file1P0C0)); - assertFalse(testTable.fileExists(p1, "000", file1P1C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", + file2P1C1)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file1P0C0)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, + "000", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - String file3P0C2 = testTable.addCommit("002") - .withUpdates(p0, file1P0C0, file2P0C1) - .withInserts(p0, "002").get(p0); + HoodieTestUtils.createCommitFiles(basePath, "002"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update + String file3P0C2 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); + List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config); assertEquals(2, - getCleanStat(hoodieCleanStatsThree, p0) + getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .getSuccessDeleteFiles().size(), "Must clean two files"); - assertFalse(testTable.fileExists(p0, "001", file1P0C0)); - assertFalse(testTable.fileExists(p0, "001", file2P0C1)); - assertTrue(testTable.fileExists(p0, "002", file3P0C2)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file1P0C0)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + file3P0C2)); // No cleaning on partially written file, with no commit. - testTable.forCommit("003").withUpdates(p0, file3P0C2); + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); - assertTrue(testTable.fileExists(p0, "003", file3P0C2)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + file3P0C2)); } /** * Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files. */ @Test - public void testKeepLatestFileVersionsMOR() throws Exception { + public void testKeepLatestFileVersionsMOR() throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String p0 = "2020/01/01"; + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); // Make 3 files, one base file and 2 log files associated with base file - String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0); - testTable.forDeltaCommit("000") - .withLogFile(p0, file1P0, 1) - .withLogFile(p0, file1P0, 2); - - // Make 2 files, one base file and 1 log files associated with base file - testTable.addDeltaCommit("001") - .withUpdates(p0, file1P0) - .withLogFile(p0, file1P0, 3); + String file1P0 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); + String file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.empty()); + HoodieTestUtils.createNewLogFile(fs, basePath, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.of(2)); + // make 1 compaction commit + HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000"); + + // Make 4 files, one base file and 3 log files associated with base file + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0); + file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + "001", file1P0, Option.of(3)); + // make 1 compaction commit + HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001"); List<HoodieCleanStat> hoodieCleanStats = runCleaner(config); assertEquals(3, - getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size(), "Must clean three files, one parquet and 2 log files"); - assertFalse(testTable.fileExists(p0, "000", file1P0)); - assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); - assertTrue(testTable.fileExists(p0, "001", file1P0)); - assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file1P0)); + assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file2P0L0, Option.empty())); + assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file2P0L0, Option.of(2))); } @Test @@ -628,33 +652,33 @@ public class TestCleaner extends HoodieClientTestBase { ); metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1); - // Now upgrade and check + // NOw upgrade and check CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); metadata = metadataMigrator.upgradeToLatest(metadata, metadata.getVersion()); - assertCleanMetadataPathEquals(newExpected, metadata); + testCleanMetadataPathEquality(metadata, newExpected); CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient); HoodieCleanMetadata oldMetadata = migrator.migrateToVersion(metadata, metadata.getVersion(), CleanerUtils.CLEAN_METADATA_VERSION_1); assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, oldMetadata.getVersion()); - assertCleanMetadataEquals(metadata, oldMetadata); - assertCleanMetadataPathEquals(oldExpected, oldMetadata); + testCleanMetadataEquality(metadata, oldMetadata); + testCleanMetadataPathEquality(oldMetadata, oldExpected); HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, oldMetadata.getVersion()); assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, newMetadata.getVersion()); - assertCleanMetadataEquals(oldMetadata, newMetadata); - assertCleanMetadataPathEquals(newExpected, newMetadata); - assertCleanMetadataPathEquals(oldExpected, oldMetadata); + testCleanMetadataEquality(oldMetadata, newMetadata); + testCleanMetadataPathEquality(newMetadata, newExpected); + testCleanMetadataPathEquality(oldMetadata, oldExpected); } - private static void assertCleanMetadataEquals(HoodieCleanMetadata expected, HoodieCleanMetadata actual) { - assertEquals(expected.getEarliestCommitToRetain(), actual.getEarliestCommitToRetain()); - assertEquals(expected.getStartCleanTime(), actual.getStartCleanTime()); - assertEquals(expected.getTimeTakenInMillis(), actual.getTimeTakenInMillis()); - assertEquals(expected.getTotalFilesDeleted(), actual.getTotalFilesDeleted()); + public void testCleanMetadataEquality(HoodieCleanMetadata input1, HoodieCleanMetadata input2) { + assertEquals(input1.getEarliestCommitToRetain(), input2.getEarliestCommitToRetain()); + assertEquals(input1.getStartCleanTime(), input2.getStartCleanTime()); + assertEquals(input1.getTimeTakenInMillis(), input2.getTimeTakenInMillis()); + assertEquals(input1.getTotalFilesDeleted(), input2.getTotalFilesDeleted()); - Map<String, HoodieCleanPartitionMetadata> map1 = expected.getPartitionMetadata(); - Map<String, HoodieCleanPartitionMetadata> map2 = actual.getPartitionMetadata(); + Map<String, HoodieCleanPartitionMetadata> map1 = input1.getPartitionMetadata(); + Map<String, HoodieCleanPartitionMetadata> map2 = input2.getPartitionMetadata(); assertEquals(map1.keySet(), map2.keySet()); @@ -669,7 +693,7 @@ public class TestCleaner extends HoodieClientTestBase { assertEquals(policies1, policies2); } - private static void assertCleanMetadataPathEquals(Map<String, Tuple3> expected, HoodieCleanMetadata metadata) { + private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map<String, Tuple3> expected) { Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = metadata.getPartitionMetadata(); @@ -683,40 +707,54 @@ public class TestCleaner extends HoodieClientTestBase { } } - private static Stream<Arguments> argumentsForTestKeepLatestCommits() { - return Stream.of( - Arguments.of(false, false), - Arguments.of(true, false), - Arguments.of(false, true) - ); + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + */ + @Test + public void testKeepLatestCommits() throws IOException { + testKeepLatestCommits(false, false); } /** * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. */ - @ParameterizedTest - @MethodSource("argumentsForTestKeepLatestCommits") - public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws Exception { + @Test + public void testKeepLatestCommitsWithFailureRetry() throws IOException { + testKeepLatestCommits(true, false); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + */ + @Test + public void testKeepLatestCommitsIncrMode() throws IOException { + testKeepLatestCommits(false, true); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + */ + private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withIncrementalCleaningMode(enableIncrementalClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String p0 = "2020/01/01"; - String p1 = "2020/01/02"; - // make 1 commit, with 1 file per partition - Map<String, String> partitionAndFileId000 = testTable.addInflightCommit("000").withInserts(p0, p1); - String file1P0C0 = partitionAndFileId000.get(p0); - String file1P1C0 = partitionAndFileId000.get(p1); + HoodieTestUtils.createInflightCommitFiles(basePath, "000"); + + String file1P0C0 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); + String file1P1C0 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); + HoodieCommitMetadata commitMetadata = generateCommitMetadata( Collections.unmodifiableMap(new HashMap<String, List<String>>() { { - put(p0, CollectionUtils.createImmutableList(file1P0C0)); - put(p1, CollectionUtils.createImmutableList(file1P1C0)); + put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0)); + put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0)); } }) ); @@ -728,20 +766,29 @@ public class TestCleaner extends HoodieClientTestBase { List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); - assertTrue(testTable.fileExists(p0, "000", file1P0C0)); - assertTrue(testTable.fileExists(p1, "000", file1P1C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", + file1P1C0)); // make next commit, with 1 insert & 1 update per partition - Map<String, String> partitionAndFileId001 = testTable.addInflightCommit("001").withInserts(p0, p1); - String file2P0C1 = partitionAndFileId001.get(p0); - String file2P1C1 = partitionAndFileId001.get(p1); - testTable.forCommit("001") - .withUpdates(p0, file1P0C0) - .withUpdates(p1, file1P1C0); + HoodieTestUtils.createInflightCommitFiles(basePath, "001"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + String file2P0C1 = + HoodieTestUtils + .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert + String file2P1C1 = + HoodieTestUtils + .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() { { - put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); - put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); + put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); + put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); } }); metaClient.getActiveTimeline().saveAsComplete( @@ -749,18 +796,28 @@ public class TestCleaner extends HoodieClientTestBase { Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); - assertTrue(testTable.fileExists(p0, "001", file2P0C1)); - assertTrue(testTable.fileExists(p1, "001", file2P1C1)); - assertTrue(testTable.fileExists(p0, "000", file1P0C0)); - assertTrue(testTable.fileExists(p1, "000", file1P1C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", + file2P1C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", + file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - String file3P0C2 = testTable.addInflightCommit("002") - .withUpdates(p0, file1P0C0) - .withUpdates(p0, file2P0C1) - .withInserts(p0).get(p0); + HoodieTestUtils.createInflightCommitFiles(basePath, "002"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update + String file3P0C2 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); + commitMetadata = generateCommitMetadata(CollectionUtils - .createImmutableMap(p0, + .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"), @@ -769,35 +826,49 @@ public class TestCleaner extends HoodieClientTestBase { List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsThree.size(), "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); - assertTrue(testTable.fileExists(p0, "000", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file1P0C0)); // make next commit, with 2 updates to existing files, and 1 insert - String file4P0C3 = testTable.addInflightCommit("003") - .withUpdates(p0, file1P0C0) - .withUpdates(p0, file2P0C1) - .withInserts(p0).get(p0); + HoodieTestUtils.createInflightCommitFiles(basePath, "003"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update + String file4P0C3 = + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( - p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); assertEquals(1, - getCleanStat(hoodieCleanStatsFour, p0).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size(), "Must not clean one old file"); - assertFalse(testTable.fileExists(p0, "000", file1P0C0)); - assertTrue(testTable.fileExists(p0, "001", file1P0C0)); - assertTrue(testTable.fileExists(p0, "002", file1P0C0)); - assertTrue(testTable.fileExists(p0, "001", file2P0C1)); - assertTrue(testTable.fileExists(p0, "002", file2P0C1)); - assertTrue(testTable.fileExists(p0, "002", file3P0C2)); - assertTrue(testTable.fileExists(p0, "003", file4P0C3)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + file3P0C2)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", + file4P0C3)); // No cleaning on partially written file, with no commit. - testTable.forCommit("004").withUpdates(p0, file3P0C2); - commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0, + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update + commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file3P0C2))); metaClient.getActiveTimeline().createNewInstant( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004")); @@ -805,40 +876,41 @@ public class TestCleaner extends HoodieClientTestBase { new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); - HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0); - assertNull(cleanStat, "Must not clean any files"); - assertTrue(testTable.fileExists(p0, "001", file1P0C0)); - assertTrue(testTable.fileExists(p0, "001", file2P0C1)); - assertTrue(testTable.fileExists(p0, "004", file3P0C2)); + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + assertEquals(0, + cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files"); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + file2P0C1)); } /** * Test Cleaning functionality of table.rollback() API. */ @Test - public void testCleanMarkerDataFilesOnRollback() throws Exception { - HoodieTestTable testTable = HoodieTestTable.of(metaClient) - .addRequestedCommit("000") - .withMarkerFiles("default", 10, IOType.MERGE); - final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size(); - assertEquals(10, numTempFilesBefore, "Some marker files are created."); + public void testCleanMarkerDataFilesOnRollback() throws IOException { + List<String> markerFiles = createMarkerFiles("000", 10); + assertEquals(10, markerFiles.size(), "Some marker files are created."); + assertEquals(markerFiles.size(), getTotalTempFiles(), "Some marker files are created."); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); + table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, + HoodieTimeline.COMMIT_ACTION, "000")); table.getActiveTimeline().transitionRequestedToInflight( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); metaClient.reloadActiveTimeline(); table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true); - final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size(); - assertEquals(0, numTempFilesAfter, "All temp files are deleted."); + assertEquals(0, getTotalTempFiles(), "All temp files are deleted."); } /** * Test CLeaner Stat when there are no partition paths. */ @Test - public void testCleaningWithZeroPartitionPaths() throws Exception { + public void testCleaningWithZeroPartitionPaths() throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) @@ -847,7 +919,9 @@ public class TestCleaner extends HoodieClientTestBase { // Make a commit, although there are no partitionPaths. // Example use-case of this is when a client wants to create a table // with just some commit metadata, but no data/partitionPaths. - HoodieTestTable.of(metaClient).addCommit("000"); + HoodieTestUtils.createCommitFiles(basePath, "000"); + + metaClient = HoodieTableMetaClient.reload(metaClient); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config); assertTrue(hoodieCleanStatsOne.isEmpty(), "HoodieCleanStats should be empty for a table with empty partitionPaths"); @@ -878,9 +952,21 @@ public class TestCleaner extends HoodieClientTestBase { * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. */ - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException { + @Test + public void testKeepLatestVersionsWithPendingCompactions() throws IOException { + testKeepLatestVersionsWithPendingCompactions(false); + } + + + /** + * Test Keep Latest Versions when there are pending compactions. + */ + @Test + public void testKeepLatestVersionsWithPendingCompactionsAndFailureRetry() throws IOException { + testKeepLatestVersionsWithPendingCompactions(true); + } + + private void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -1032,6 +1118,33 @@ public class TestCleaner extends HoodieClientTestBase { "Correct number of files under compaction deleted"); } + /** + * Utility method to create temporary data files. + * + * @param instantTime Commit Timestamp + * @param numFiles Number for files to be generated + * @return generated files + * @throws IOException in case of error + */ + private List<String> createMarkerFiles(String instantTime, int numFiles) throws IOException { + List<String> files = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime)); + } + return files; + } + + /*** + * Helper method to return temporary files count. + * + * @return Number of temporary files found + * @throws IOException in case of error + */ + private int getTotalTempFiles() throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)) + .size(); + } + private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient, List<String> paths) { Predicate<String> roFilePredicate = diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index 1f638c3..da4224a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.OptimisticConsistencyGuard; -import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; @@ -66,9 +66,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("consistencyGuardType") public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3"); ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000); ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName()) @@ -88,7 +88,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays @@ -98,7 +98,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -106,7 +106,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); @@ -115,14 +115,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); } @Test public void testCheckFailingDisappearFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays @@ -132,7 +132,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -140,8 +140,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); @@ -150,8 +150,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java index 55b7b50..af679ce 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java @@ -18,17 +18,17 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; + import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.IOType; import org.apache.hudi.testutils.HoodieClientTestUtils; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index f49d6d5..d8bb946 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -35,6 +34,7 @@ import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.avro.Schema; import org.apache.log4j.LogManager; @@ -73,8 +73,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); - FileCreateUtils.createCommit(basePath, "001"); - FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize); + HoodieClientTestUtils.fakeCommit(basePath, "001"); + HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); @@ -193,7 +193,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0) .insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build(); - FileCreateUtils.createCommit(basePath, "001"); + HoodieClientTestUtils.fakeCommit(basePath, "001"); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index c044bee..1529d79 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -53,9 +53,6 @@ import org.junit.jupiter.api.Test; import java.util.List; import java.util.stream.Collectors; -import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -169,9 +166,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file"); } } - createDeltaCommit(basePath, newCommitTime); - createRequestedDeltaCommit(basePath, newCommitTime); - createInflightDeltaCommit(basePath, newCommitTime); + HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime); // Do a compaction table = HoodieTable.create(config, hadoopConf); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java index 83e7ea0..c6652ed 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java @@ -20,14 +20,16 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.FileSystemTestUtils; +import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,20 +55,38 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { cleanupResources(); } + private void givenCommit0(boolean isDeltaCommit) throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2"); + if (isDeltaCommit) { + HoodieClientTestUtils.fakeDeltaCommit(basePath, "000"); + } else { + HoodieClientTestUtils.fakeCommit(basePath, "000"); + } + } + + private void givenInflightCommit1(boolean isDeltaCommit) throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1"); + HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", IOType.CREATE); + + HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", IOType.CREATE); + + if (isDeltaCommit) { + HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0); + HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.APPEND); + HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", IOType.APPEND); + HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001"); + } else { + HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2"); + HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.MERGE); + HoodieClientTestUtils.fakeInFlightCommit(basePath, "001"); + } + } + @Test public void testCopyOnWriteRollback() throws Exception { // given: wrote some base files and corresponding markers - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String f0 = testTable.addRequestedCommit("000") - .withInserts("partA").get("partA"); - String f1 = testTable.addCommit("001") - .withUpdates("partA", f0) - .withInserts("partB").get("partB"); - String f2 = "f2"; - testTable.forCommit("001") - .withMarkerFile("partA", f0, IOType.MERGE) - .withMarkerFile("partB", f1, IOType.CREATE) - .withMarkerFile("partA", f2, IOType.CREATE); + givenCommit0(false); + givenInflightCommit1(false); // when List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") @@ -75,8 +95,8 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted correctly, non-existent files reported as failed deletes assertEquals(2, stats.size()); - List<FileStatus> partAFiles = testTable.listAllFiles("partA"); - List<FileStatus> partBFiles = testTable.listAllFiles("partB"); + List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA")); + List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB")); assertEquals(0, partBFiles.size()); assertEquals(1, partAFiles.size()); @@ -87,19 +107,8 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { @Test public void testMergeOnReadRollback() throws Exception { // given: wrote some base + log files and corresponding markers - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String f2 = testTable.addRequestedDeltaCommit("000") - .withInserts("partA").get("partA"); - String f1 = testTable.addDeltaCommit("001") - .withLogFile("partA", f2) - .withInserts("partB").get("partB"); - String f3 = "f3"; - String f4 = "f4"; - testTable.forDeltaCommit("001") - .withMarkerFile("partB", f1, IOType.CREATE) - .withMarkerFile("partA", f3, IOType.CREATE) - .withMarkerFile("partA", f2, IOType.APPEND) - .withMarkerFile("partB", f4, IOType.APPEND); + givenCommit0(true); + givenInflightCommit1(true); // when List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") @@ -108,12 +117,12 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted, rollback block is appended (even if append does not exist) assertEquals(2, stats.size()); // will have the log file - List<FileStatus> partBFiles = testTable.listAllFiles("partB"); + List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB")); assertEquals(1, partBFiles.size()); assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())); assertTrue(partBFiles.get(0).getLen() > 0); - List<FileStatus> partAFiles = testTable.listAllFiles("partA"); + List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA")); assertEquals(3, partAFiles.size()); assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count()); assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count()); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index b61abb0..6db6529 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -35,10 +35,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; -import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.IOType; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieParquetWriter; @@ -59,6 +59,7 @@ import org.apache.spark.sql.SQLContext; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -73,6 +74,57 @@ import java.util.stream.Collectors; public class HoodieClientTestUtils { private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class); + public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; + + private static void fakeMetaFile(String basePath, String instantTime, String suffix) throws IOException { + String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME; + new File(parentPath).mkdirs(); + new File(parentPath + "/" + instantTime + suffix).createNewFile(); + } + + public static void fakeCommit(String basePath, String instantTime) throws IOException { + fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); + } + + public static void fakeDeltaCommit(String basePath, String instantTime) throws IOException { + fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); + } + + public static void fakeInflightDeltaCommit(String basePath, String instantTime) throws IOException { + fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); + } + + public static void fakeInFlightCommit(String basePath, String instantTime) throws IOException { + fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION); + } + + public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId) + throws Exception { + fakeDataFile(basePath, partitionPath, instantTime, fileId, 0); + } + + public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length) + throws Exception { + String parentPath = String.format("%s/%s", basePath, partitionPath); + new File(parentPath).mkdirs(); + String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); + new File(path).createNewFile(); + new RandomAccessFile(path, "rw").setLength(length); + } + + public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version) + throws Exception { + fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0); + } + + public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length) + throws Exception { + String parentPath = String.format("%s/%s", basePath, partitionPath); + new File(parentPath).mkdirs(); + String path = String.format("%s/%s", parentPath, FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1")); + new File(path).createNewFile(); + new RandomAccessFile(path, "rw").setLength(length); + } /** * Returns a Spark config for this test. @@ -101,8 +153,8 @@ public class HoodieClientTestUtils { return HoodieReadClient.addHoodieSupport(sparkConf); } - private static HashMap<String, String> getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, - List<HoodieInstant> commitsToReturn) throws IOException { + public static HashMap<String, String> getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, + List<HoodieInstant> commitsToReturn) throws IOException { HashMap<String, String> fileIdToFullPath = new HashMap<>(); for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = @@ -175,8 +227,6 @@ public class HoodieClientTestUtils { /** * Find total basefiles for passed in paths. - * <p> - * TODO move to {@link FileCreateUtils}. */ public static Map<String, Integer> getBaseFileCountForPaths(String basePath, FileSystem fs, String... paths) { @@ -195,9 +245,6 @@ public class HoodieClientTestUtils { } } - /** - * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. - */ public static String writeParquetFile(String basePath, String partitionPath, String filename, List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { @@ -213,7 +260,7 @@ public class HoodieClientTestUtils { HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); HoodieParquetWriter writer = new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, - schema, new SparkTaskContextSupplier()); + schema, new SparkTaskContextSupplier()); int seqId = 1; for (HoodieRecord record : records) { GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); @@ -231,9 +278,6 @@ public class HoodieClientTestUtils { return filename; } - /** - * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. - */ public static String writeParquetFile(String basePath, String partitionPath, List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException { Thread.sleep(1000); @@ -245,9 +289,27 @@ public class HoodieClientTestUtils { createCommitTime); } - /** - * TODO move to {@link FileCreateUtils}. - */ + public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime) + throws IOException { + return createMarkerFile(basePath, partitionPath, instantTime); + } + + public static String createMarkerFile(String basePath, String partitionPath, String instantTime) + throws IOException { + return createMarkerFile(basePath, partitionPath, instantTime, UUID.randomUUID().toString(), IOType.MERGE); + } + + public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType) + throws IOException { + String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/"; + new File(folderPath).mkdirs(); + String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, DEFAULT_WRITE_TOKEN, instantTime, + HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType); + File f = new File(folderPath + markerFileName); + f.createNewFile(); + return f.getAbsolutePath(); + } + public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException { createTempFolderForMarkerFiles(basePath); String folderPath = getTempFolderName(basePath); @@ -256,9 +318,6 @@ public class HoodieClientTestUtils { new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile(); } - /** - * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. - */ public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) { String folderPath = getTempFolderName(basePath); File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath); @@ -268,11 +327,11 @@ public class HoodieClientTestUtils { return 0; } - private static void createTempFolderForMarkerFiles(String basePath) { + public static void createTempFolderForMarkerFiles(String basePath) { new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs(); } - private static String getTempFolderName(String basePath) { + public static String getTempFolderName(String basePath) { return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME; } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java deleted file mode 100644 index 2da8e29..0000000 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.common.testutils; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; - -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - -public class FileCreateUtils { - - private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException { - Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME); - Files.createDirectories(parentPath); - Path metaFilePath = parentPath.resolve(instantTime + suffix); - if (Files.notExists(metaFilePath)) { - Files.createFile(metaFilePath); - } - } - - public static void createCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); - } - - public static void createRequestedCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION); - } - - public static void createInflightCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION); - } - - public static void createDeltaCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); - } - - public static void createRequestedDeltaCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION); - } - - public static void createInflightDeltaCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); - } - - public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId) - throws Exception { - createDataFile(basePath, partitionPath, instantTime, fileId, 0); - } - - public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length) - throws Exception { - Path parentPath = Paths.get(basePath, partitionPath); - Files.createDirectories(parentPath); - Path dataFilePath = parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); - if (Files.notExists(dataFilePath)) { - Files.createFile(dataFilePath); - } - new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length); - } - - public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version) - throws Exception { - createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0); - } - - public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length) - throws Exception { - Path parentPath = Paths.get(basePath, partitionPath); - Files.createDirectories(parentPath); - Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1")); - if (Files.notExists(logFilePath)) { - Files.createFile(logFilePath); - } - new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length); - } - - public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType) - throws IOException { - Path folderPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); - Files.createDirectories(folderPath); - String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", instantTime, - HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType); - Path markerFilePath = folderPath.resolve(markerFileName); - if (Files.notExists(markerFilePath)) { - Files.createFile(markerFilePath); - } - return markerFilePath.toAbsolutePath().toString(); - } -} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java deleted file mode 100644 index 32f2d45..0000000 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.common.testutils; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.ValidationUtils; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.stream.IntStream; - -import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; -import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; - -public class HoodieTestTable { - - private final String basePath; - private final FileSystem fs; - private HoodieTableMetaClient metaClient; - private String currentInstantTime; - - private HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) { - ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath())); - ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs())); - this.basePath = basePath; - this.fs = fs; - this.metaClient = metaClient; - } - - public static HoodieTestTable of(HoodieTableMetaClient metaClient) { - return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient); - } - - public HoodieTestTable addRequestedCommit(String instantTime) throws Exception { - createRequestedCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - - public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception { - createRequestedDeltaCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - - public HoodieTestTable addInflightCommit(String instantTime) throws Exception { - createRequestedCommit(basePath, instantTime); - createInflightCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - - public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception { - createRequestedDeltaCommit(basePath, instantTime); - createInflightDeltaCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - - public HoodieTestTable addCommit(String instantTime) throws Exception { - createRequestedCommit(basePath, instantTime); - createInflightCommit(basePath, instantTime); - createCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - - public HoodieTestTable addDeltaCommit(String instantTime) throws Exception { - createRequestedDeltaCommit(basePath, instantTime); - createInflightDeltaCommit(basePath, instantTime); - createDeltaCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - - public HoodieTestTable forCommit(String instantTime) { - currentInstantTime = instantTime; - return this; - } - - public HoodieTestTable forDeltaCommit(String instantTime) { - currentInstantTime = instantTime; - return this; - } - - public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException { - return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType); - } - - public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException { - createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType); - return this; - } - - public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType ioType) throws IOException { - String[] fileIds = IntStream.range(0, num).mapToObj(i -> UUID.randomUUID().toString()).toArray(String[]::new); - return withMarkerFiles(partitionPath, fileIds, ioType); - } - - public HoodieTestTable withMarkerFiles(String partitionPath, String[] fileIds, IOType ioType) throws IOException { - for (String fileId : fileIds) { - createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType); - } - return this; - } - - /** - * Insert one base file to each of the given distinct partitions. - * - * @return A {@link Map} of partition and its newly inserted file's id. - */ - public Map<String, String> withInserts(String... partitions) throws Exception { - Map<String, String> partitionFileIdMap = new HashMap<>(); - for (String p : partitions) { - String fileId = UUID.randomUUID().toString(); - FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId); - partitionFileIdMap.put(p, fileId); - } - return partitionFileIdMap; - } - - public HoodieTestTable withUpdates(String partition, String... fileIds) throws Exception { - for (String f : fileIds) { - FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, f); - } - return this; - } - - public String withLogFile(String partitionPath) throws Exception { - String fileId = UUID.randomUUID().toString(); - withLogFile(partitionPath, fileId); - return fileId; - } - - public HoodieTestTable withLogFile(String partitionPath, String fileId) throws Exception { - return withLogFile(partitionPath, fileId, 0); - } - - public HoodieTestTable withLogFile(String partitionPath, String fileId, int version) throws Exception { - FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime, fileId, version); - return this; - } - - public boolean filesExist(Map<String, String> partitionAndFileId, String instantTime) { - return partitionAndFileId.entrySet().stream().allMatch(entry -> { - String partition = entry.getKey(); - String fileId = entry.getValue(); - return fileExists(partition, instantTime, fileId); - }); - } - - public boolean filesExist(String partition, String instantTime, String... fileIds) { - return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, instantTime, f)); - } - - public boolean fileExists(String partition, String instantTime, String fileId) { - try { - return fs.exists(new Path(Paths.get(basePath, partition, - FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString())); - } catch (IOException e) { - throw new HoodieTestTableException(e); - } - } - - public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) { - return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v)); - } - - public boolean logFileExists(String partition, String instantTime, String fileId, int version) { - try { - return fs.exists(new Path(Paths.get(basePath, partition, - FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, "1-0-1")).toString())); - } catch (IOException e) { - throw new HoodieTestTableException(e); - } - } - - public List<FileStatus> listAllFiles(String partitionPath) throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())); - } - - public List<FileStatus> listAllFilesInTempFolder() throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())); - } - - public static class HoodieTestTableException extends RuntimeException { - public HoodieTestTableException(Throwable t) { - super(t); - } - } -} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 8b38b25..92d431c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -99,6 +99,7 @@ import static org.junit.jupiter.api.Assertions.fail; */ public class HoodieTestUtils { + public static final String TEST_EXTENSION = ".test"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; public static final int DEFAULT_LOG_VERSION = 1; @@ -137,7 +138,7 @@ public class HoodieTestUtils { } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName) + String tableName) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); @@ -145,7 +146,7 @@ public class HoodieTestUtils { } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - HoodieFileFormat baseFileFormat) + HoodieFileFormat baseFileFormat) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toString()); @@ -153,7 +154,7 @@ public class HoodieTestUtils { } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - Properties properties) + Properties properties) throws IOException { properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); @@ -165,9 +166,6 @@ public class HoodieTestUtils { return COMMIT_FORMATTER.format(new Date()); } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static void createCommitFiles(String basePath, String... instantTimes) throws IOException { for (String instantTime : instantTimes) { new File( @@ -178,6 +176,20 @@ public class HoodieTestUtils { + HoodieTimeline.makeInflightCommitFileName(instantTime)).createNewFile(); new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime)) + .createNewFile(); + } + } + + public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException { + for (String instantTime : instantTimes) { + new File( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile(); + new File( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile(); + new File( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime)) .createNewFile(); } } @@ -186,9 +198,6 @@ public class HoodieTestUtils { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static void createInflightCommitFiles(String basePath, String... instantTimes) throws IOException { for (String instantTime : instantTimes) { @@ -202,12 +211,11 @@ public class HoodieTestUtils { public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) { for (String instantTime : instantTimes) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime), - HoodieTimeline.makeInflightCleanerFileName(instantTime)) - .forEach(f -> { + HoodieTimeline.makeInflightCleanerFileName(instantTime)).forEach(f -> { FSDataOutputStream os = null; try { - Path commitFile = new Path(Paths - .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); + Path commitFile = new Path( + metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); os = metaClient.getFs().create(commitFile, true); // Write empty clean metadata os.write(TimelineMetadataUtils.serializeCleanerPlan( @@ -229,12 +237,11 @@ public class HoodieTestUtils { public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), - HoodieTimeline.makeInflightCleanerFileName(commitTime)) - .forEach(f -> { + HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> { FSDataOutputStream os = null; try { - Path commitFile = new Path(Paths - .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); + Path commitFile = new Path( + metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); os = metaClient.getFs().create(commitFile, true); // Write empty clean metadata os.write(new byte[0]); @@ -252,18 +259,18 @@ public class HoodieTestUtils { }); } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ + public static String createNewDataFile(String basePath, String partitionPath, String instantTime) + throws IOException { + String fileID = UUID.randomUUID().toString(); + return createDataFile(basePath, partitionPath, instantTime, fileID); + } + public static String createNewDataFile(String basePath, String partitionPath, String instantTime, long length) throws IOException { String fileID = UUID.randomUUID().toString(); return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length); } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; @@ -272,7 +279,7 @@ public class HoodieTestUtils { return fileID; } - private static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, + public static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, long length) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; Files.createDirectories(Paths.get(folderPath)); @@ -284,9 +291,6 @@ public class HoodieTestUtils { return fileID; } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime, String fileID, Option<Integer> version) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; @@ -303,6 +307,17 @@ public class HoodieTestUtils { return fileID; } + public static void createCompactionCommitFiles(FileSystem fs, String basePath, String... instantTimes) + throws IOException { + for (String instantTime : instantTimes) { + boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeCommitFileName(instantTime))); + if (!createFile) { + throw new IOException("cannot create commit file for commit " + instantTime); + } + } + } + public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, List<Pair<String, FileSlice>> fileSliceList) throws IOException { HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty()); @@ -311,16 +326,10 @@ public class HoodieTestUtils { TimelineMetadataUtils.serializeCompactionPlan(plan)); } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static String getDataFilePath(String basePath, String partitionPath, String instantTime, String fileID) { return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID); } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static String getLogFilePath(String basePath, String partitionPath, String instantTime, String fileID, Option<Integer> version) { return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", instantTime, @@ -331,43 +340,36 @@ public class HoodieTestUtils { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION; } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static String getInflightCommitFilePath(String basePath, String instantTime) { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static String getRequestedCompactionFilePath(String basePath, String instantTime) { return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static boolean doesDataFileExist(String basePath, String partitionPath, String instantTime, String fileID) { return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists(); } + public static boolean doesLogFileExist(String basePath, String partitionPath, String instantTime, String fileID, + Option<Integer> version) { + return new File(getLogFilePath(basePath, partitionPath, instantTime, fileID, version)).exists(); + } + public static boolean doesCommitExist(String basePath, String instantTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION) - .exists(); + .exists(); } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ public static boolean doesInflightExist(String basePath, String instantTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_EXTENSION) - .exists(); + .exists(); } public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath, @@ -417,8 +419,8 @@ public class HoodieTestUtils { Writer logWriter; try { logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) - .overBaseCommit(location.getInstantTime()).withFs(fs).build(); + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) + .overBaseCommit(location.getInstantTime()).withFs(fs).build(); Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime()); @@ -472,7 +474,7 @@ public class HoodieTestUtils { public static FileStatus[] listAllDataFilesAndLogFilesInPath(FileSystem fs, String basePath) throws IOException { return Stream.concat(Arrays.stream(listAllDataFilesInPath(fs, basePath)), Arrays.stream(listAllLogFilesInPath(fs, basePath))) - .toArray(FileStatus[]::new); + .toArray(FileStatus[]::new); } public static List<String> monotonicIncreasingCommitTimestamps(int numTimestamps, int startSecsDelta) {