This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e79fbc0 [HUDI-1054] Several performance fixes during finalizing writes (#1768) e79fbc0 is described below commit e79fbc07fe803bc51cdf4f11948b133bbaa70595 Author: Udit Mehrotra <umehr...@illinois.edu> AuthorDate: Fri Jul 31 20:10:28 2020 -0700 [HUDI-1054] Several performance fixes during finalizing writes (#1768) Co-authored-by: Udit Mehrotra <udi...@amazon.com> --- .../cli/commands/TestArchivedCommitsCommand.java | 2 +- .../hudi/cli/commands/TestCommitsCommand.java | 2 +- .../org/apache/hudi/client/HoodieWriteClient.java | 4 +- .../org/apache/hudi/config/HoodieWriteConfig.java | 13 +++ .../java/org/apache/hudi/table/HoodieTable.java | 17 ++-- .../hudi/table/HoodieTimelineArchiveLog.java | 13 +-- .../java/org/apache/hudi/table/MarkerFiles.java | 100 ++++++++++++++++----- .../rollback/BaseRollbackActionExecutor.java | 2 +- .../hudi/io/TestHoodieTimelineArchiveLog.java | 14 +-- .../org/apache/hudi/table/TestMarkerFiles.java | 16 +++- 10 files changed, 130 insertions(+), 53 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 313c1bc..4c7ce88 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -92,7 +92,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest { // archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); } @AfterEach diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index 45c340d..44e2b80 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -176,7 +176,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { // archive metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration()); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); assertTrue(cr.isSuccess()); diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index b2ad315..9782b46 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -337,7 +337,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo try { // Delete the marker directory for the instant. - new MarkerFiles(table, instantTime).quietDeleteMarkerDir(); + new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism()); // Do an inline compaction if enabled if (config.isInlineCompaction()) { @@ -349,7 +349,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); autoCleanOnCommit(instantTime); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9aecdf7..69758f2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -88,6 +88,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; + public static final String MARKERS_DELETE_PARALLELISM = "hoodie.markers.delete.parallelism"; + public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = "100"; public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT .toString(); @@ -235,6 +237,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); } + public int getMarkersDeleteParallelism() { + return Integer.parseInt(props.getProperty(MARKERS_DELETE_PARALLELISM)); + } + public boolean isEmbeddedTimelineServerEnabled() { return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED)); } @@ -830,6 +836,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withMarkersDeleteParallelism(int parallelism) { + props.setProperty(MARKERS_DELETE_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withEmbeddedTimelineServerEnabled(boolean enabled) { props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled)); return this; @@ -874,6 +885,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { DEFAULT_HOODIE_WRITE_STATUS_CLASS); setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(MARKERS_DELETE_PARALLELISM), MARKERS_DELETE_PARALLELISM, + DEFAULT_MARKERS_DELETE_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED), EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED); setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP), diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 748091e..d8b0c6e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -70,6 +70,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -428,21 +429,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri } // we are not including log appends here, since they are already fail-safe. - List<String> invalidDataPaths = markers.createdAndMergedDataPaths(); - List<String> validDataPaths = stats.stream() + Set<String> invalidDataPaths = markers.createdAndMergedDataPaths(jsc, config.getFinalizeWriteParallelism()); + Set<String> validDataPaths = stats.stream() .map(HoodieWriteStat::getPath) .filter(p -> p.endsWith(this.getBaseFileExtension())) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); + // Contains list of partially created files. These needs to be cleaned up. invalidDataPaths.removeAll(validDataPaths); + if (!invalidDataPaths.isEmpty()) { LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); - } - Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream() - .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString())) - .collect(Collectors.groupingBy(Pair::getKey)); + Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream() + .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString())) + .collect(Collectors.groupingBy(Pair::getKey)); - if (!invalidPathsByPartition.isEmpty()) { // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS. // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit if (consistencyCheckEnabled) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 98d3e05..4be00a3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -54,6 +54,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import java.io.FileNotFoundException; import java.io.IOException; @@ -121,7 +122,7 @@ public class HoodieTimelineArchiveLog { /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired() throws IOException { + public boolean archiveIfRequired(JavaSparkContext jsc) throws IOException { try { List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); @@ -129,7 +130,7 @@ public class HoodieTimelineArchiveLog { if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); LOG.info("Archiving instants " + instantsToArchive); - archive(instantsToArchive); + archive(jsc, instantsToArchive); LOG.info("Deleting archived instants " + instantsToArchive); success = deleteArchivedInstants(instantsToArchive); } else { @@ -267,7 +268,7 @@ public class HoodieTimelineArchiveLog { return success; } - public void archive(List<HoodieInstant> instants) throws HoodieCommitException { + public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws HoodieCommitException { try { HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); @@ -275,7 +276,7 @@ public class HoodieTimelineArchiveLog { List<IndexedRecord> records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { - deleteAnyLeftOverMarkerFiles(hoodieInstant); + deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant); records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); if (records.size() >= this.config.getCommitArchivalBatchSize()) { writeToFile(wrapperSchema, records); @@ -293,9 +294,9 @@ public class HoodieTimelineArchiveLog { } } - private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) { + private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant instant) { MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp()); - if (markerFiles.deleteMarkerDir()) { + if (markerFiles.deleteMarkerDir(jsc, config.getMarkersDeleteParallelism())) { LOG.info("Cleaned up left over marker directory for instant :" + instant); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java index 00eb7df..8a310fd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -18,8 +18,12 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; @@ -28,26 +32,27 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.IOType; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * Operates on marker files for a given write action (commit, delta commit, compaction). */ -public class MarkerFiles { +public class MarkerFiles implements Serializable { private static final Logger LOG = LogManager.getLogger(MarkerFiles.class); - public static String stripMarkerSuffix(String path) { - return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN)); - } - private final String instantTime; - private final FileSystem fs; - private final Path markerDirPath; + private final transient FileSystem fs; + private final transient Path markerDirPath; private final String basePath; public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) { @@ -64,9 +69,9 @@ public class MarkerFiles { instantTime); } - public void quietDeleteMarkerDir() { + public void quietDeleteMarkerDir(JavaSparkContext jsc, int parallelism) { try { - deleteMarkerDir(); + deleteMarkerDir(jsc, parallelism); } catch (HoodieIOException ioe) { LOG.warn("Error deleting marker directory for instant " + instantTime, ioe); } @@ -74,34 +79,77 @@ public class MarkerFiles { /** * Delete Marker directory corresponding to an instant. + * + * @param jsc Java Spark Context. + * @param parallelism Spark parallelism for deletion. */ - public boolean deleteMarkerDir() { + public boolean deleteMarkerDir(JavaSparkContext jsc, int parallelism) { try { - boolean result = fs.delete(markerDirPath, true); - if (result) { + if (fs.exists(markerDirPath)) { + FileStatus[] fileStatuses = fs.listStatus(markerDirPath); + List<String> markerDirSubPaths = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + + if (markerDirSubPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); + parallelism = Math.min(markerDirSubPaths.size(), parallelism); + jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> { + Path subPath = new Path(subPathStr); + FileSystem fileSystem = subPath.getFileSystem(conf.get()); + fileSystem.delete(subPath, true); + }); + } + + boolean result = fs.delete(markerDirPath, true); LOG.info("Removing marker directory at " + markerDirPath); - } else { - LOG.info("No marker directory to delete at " + markerDirPath); + return result; } - return result; } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } + return false; } public boolean doesMarkerDirExist() throws IOException { return fs.exists(markerDirPath); } - public List<String> createdAndMergedDataPaths() throws IOException { - List<String> dataFiles = new LinkedList<>(); - FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> { - String pathStr = status.getPath().toString(); - if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { - dataFiles.add(translateMarkerToDataPath(pathStr)); + public Set<String> createdAndMergedDataPaths(JavaSparkContext jsc, int parallelism) throws IOException { + Set<String> dataFiles = new HashSet<>(); + + FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath); + List<String> subDirectories = new ArrayList<>(); + for (FileStatus topLevelStatus: topLevelStatuses) { + if (topLevelStatus.isFile()) { + String pathStr = topLevelStatus.getPath().toString(); + if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { + dataFiles.add(translateMarkerToDataPath(pathStr)); + } + } else { + subDirectories.add(topLevelStatus.getPath().toString()); } - return true; - }, false); + } + + if (subDirectories.size() > 0) { + parallelism = Math.min(subDirectories.size(), parallelism); + SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); + dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> { + Path path = new Path(directory); + FileSystem fileSystem = path.getFileSystem(serializedConf.get()); + RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true); + List<String> result = new ArrayList<>(); + while (itr.hasNext()) { + FileStatus status = itr.next(); + String pathStr = status.getPath().toString(); + if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { + result.add(translateMarkerToDataPath(pathStr)); + } + } + return result.iterator(); + }).collect()); + } + return dataFiles; } @@ -110,6 +158,10 @@ public class MarkerFiles { return MarkerFiles.stripMarkerSuffix(rPath); } + public static String stripMarkerSuffix(String path) { + return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN)); + } + public List<String> allMarkerFilePaths() throws IOException { List<String> markerFiles = new ArrayList<>(); FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 846e8a8..90b9bb3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -113,7 +113,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood } // Finally, remove the marker files post rollback. - new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(); + new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism()); return rollbackMetadata; } diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 484caf7..5785fc8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -79,7 +79,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { .withParallelism(2, 2).forTable("test-trip-table").build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); } @@ -157,7 +157,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); - assertTrue(archiveLog.archiveIfRequired()); + assertTrue(archiveLog.archiveIfRequired(jsc)); // reload the timeline and remove the remaining commits timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); @@ -246,7 +246,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5"); @@ -289,7 +289,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe"); @@ -315,7 +315,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - assertTrue(archiveLog.archiveIfRequired()); + assertTrue(archiveLog.archiveIfRequired(jsc)); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals(5, timeline.countInstants(), "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); @@ -349,7 +349,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline(); assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline(); assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")), @@ -397,7 +397,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf()); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java index 723d9e1..af679ce 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java @@ -28,6 +28,9 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.IOType; +import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,16 +48,23 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { private MarkerFiles markerFiles; private FileSystem fs; private Path markerFolderPath; + private JavaSparkContext jsc; @BeforeEach public void setup() throws IOException { initPath(); initMetaClient(); + this.jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestMarkerFiles.class.getName())); this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000")); this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000"); } + @AfterEach + public void cleanup() { + jsc.stop(); + } + private void createSomeMarkerFiles() { markerFiles.create("2020/06/01", "file1", IOType.MERGE); markerFiles.create("2020/06/02", "file2", IOType.APPEND); @@ -97,7 +107,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { // then assertTrue(markerFiles.doesMarkerDirExist()); - assertTrue(markerFiles.deleteMarkerDir()); + assertTrue(markerFiles.deleteMarkerDir(jsc, 2)); assertFalse(markerFiles.doesMarkerDirExist()); } @@ -105,7 +115,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { public void testDeletionWhenMarkerDirNotExists() throws IOException { // then assertFalse(markerFiles.doesMarkerDirExist()); - assertFalse(markerFiles.deleteMarkerDir()); + assertFalse(markerFiles.deleteMarkerDir(jsc, 2)); } @Test @@ -120,7 +130,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { // then assertIterableEquals(CollectionUtils.createImmutableList( "2020/06/01/file1", "2020/06/03/file3"), - markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList()) + markerFiles.createdAndMergedDataPaths(jsc, 2).stream().sorted().collect(Collectors.toList()) ); }