This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new a8bd76c [HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857) a8bd76c is described below commit a8bd76c299b13af93c9eed92309ecb3a86a6734c Author: vinoth chandar <vinothchan...@users.noreply.github.com> AuthorDate: Wed Jul 22 21:22:06 2020 -0700 [HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857) - Prevents failed compactions from causing issues with future commits --- .../org/apache/hudi/client/HoodieWriteClient.java | 9 + .../compact/ScheduleCompactionActionExecutor.java | 2 +- .../table/action/compact/CompactionTestBase.java | 243 +++++++++++++++++++++ .../table/action/compact/TestAsyncCompaction.java | 213 +----------------- .../table/action/compact/TestInlineCompaction.java | 116 ++++++++++ 5 files changed, 372 insertions(+), 211 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 30dfecb..2486d91 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -341,6 +341,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo // Do an inline compaction if enabled if (config.isInlineCompaction()) { + runAnyPendingCompactions(table); metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); inlineCompact(extraMetadata); } else { @@ -355,6 +356,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } } + private void runAnyPendingCompactions(HoodieTable<?> table) { + table.getActiveTimeline().getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstants() + .forEach(instant -> { + LOG.info("Running previously failed inflight compaction at instant " + instant); + compact(instant.getTimestamp(), true); + }); + } + /** * Handle auto clean during commit. * @param instantTime diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 174c64e..6d88e5c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -65,7 +65,7 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option< int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline() .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { - LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction + LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " + config.getInlineCompactDeltaCommitMax()); return new HoodieCompactionPlan(); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java new file mode 100644 index 0000000..c171255 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.HoodieTestDataGenerator; +import org.apache.spark.api.java.JavaRDD; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CompactionTestBase extends HoodieClientTestBase { + + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withAutoCommit(autoCommit).withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); + } + + /** + * HELPER METHODS FOR TESTING. + **/ + protected void validateDeltaCommit(String latestDeltaCommit, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation, + HoodieWriteConfig cfg) { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTable table = getHoodieTable(metaClient, cfg); + List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table); + fileSliceList.forEach(fileSlice -> { + Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId()); + if (opPair != null) { + assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant"); + assertTrue(fileSlice.getLogFiles().count() > 0, + "Expect atleast one log file to be present where the latest delta commit was written"); + assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present"); + } else { + assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0, + "Expect baseInstant to be less than or equal to latestDeltaCommit"); + } + }); + } + + protected List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants, + List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants) + throws Exception { + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions(); + List<String> gotPendingCompactionInstants = + pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList()); + assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants); + + Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation = + CompactionUtils.getAllPendingCompactionOperations(metaClient); + + if (insertFirst) { + // Use first instant for inserting records + String firstInstant = deltaInstants.get(0); + deltaInstants = deltaInstants.subList(1, deltaInstants.size()); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + client.startCommitWithTime(firstInstant); + JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant); + List<WriteStatus> statusList = statuses.collect(); + + if (!cfg.shouldAutoCommit()) { + client.commit(firstInstant, statuses); + } + assertNoWriteErrors(statusList); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); + List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); + assertTrue(dataFilesToRead.stream().findAny().isPresent(), + "should list the parquet files we wrote in the delta commit"); + validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg); + } + + int numRecords = records.size(); + for (String instantTime : deltaInstants) { + records = dataGen.generateUpdates(instantTime, numRecords); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); + validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg); + } + return records; + } + + protected void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); + HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() + .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get(); + assertTrue(instant.isInflight(), "Instant must be marked inflight"); + } + + protected void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) { + client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); + assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set"); + } + + protected void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table, + HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { + scheduleCompaction(compactionInstantTime, client, cfg); + executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction); + } + + protected void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table, + HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { + + client.compact(compactionInstantTime); + List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table); + assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty"); + assertFalse(fileSliceList.stream() + .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)), + "Verify all file-slices have base-instant same as compaction instant"); + assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()), + "Verify all file-slices have data-files"); + + if (hasDeltaCommitAfterPendingCompaction) { + assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0), + "Verify all file-slices have atleast one log-file"); + } else { + assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0), + "Verify all file-slices have no log-files"); + } + + // verify that there is a commit + table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg); + HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); + String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); + assertEquals(latestCompactionCommitTime, compactionInstantTime, + "Expect compaction instant time to be the latest commit time"); + assertEquals(expectedNumRecs, + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(), + "Must contain expected records"); + + } + + protected List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, HoodieWriteClient client, + HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) { + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + + client.startCommitWithTime(instantTime); + + JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime); + List<WriteStatus> statusList = statuses.collect(); + assertNoWriteErrors(statusList); + if (!cfg.shouldAutoCommit() && !skipCommit) { + client.commit(instantTime, statuses); + } + + Option<HoodieInstant> deltaCommit = + metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant(); + if (skipCommit && !cfg.shouldAutoCommit()) { + assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0, + "Delta commit should not be latest instant"); + } else { + assertTrue(deltaCommit.isPresent()); + assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant"); + } + return statusList; + } + + protected List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); + HoodieTableFileSystemView view = + getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); + return view.getLatestBaseFiles().collect(Collectors.toList()); + } + + protected List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) { + HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), + table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline()); + return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) + .flatMap(view::getLatestFileSlices).collect(Collectors.toList()); + } + + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index bf37a88..81840b9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -18,77 +18,36 @@ package org.apache.hudi.table.action.compact; -import org.apache.hudi.avro.model.HoodieCompactionOperation; -import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.FileSystemViewStorageType; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.common.util.CompactionUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; -import org.apache.hudi.testutils.HoodieTestDataGenerator; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; -import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test Cases for Async Compaction and Ingestion interaction. */ -public class TestAsyncCompaction extends HoodieClientTestBase { +public class TestAsyncCompaction extends CompactionTestBase { private HoodieWriteConfig getConfig(Boolean autoCommit) { return getConfigBuilder(autoCommit).build(); } - private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) - .withAutoCommit(autoCommit).withAssumeDatePartitioning(true) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build()) - .forTable("test-trip-table") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); - } - @Test public void testRollbackForInflightCompaction() throws Exception { // Rollback inflight compaction @@ -372,170 +331,4 @@ public class TestAsyncCompaction extends HoodieClientTestBase { executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true); } } - - /** - * HELPER METHODS FOR TESTING. - **/ - - private void validateDeltaCommit(String latestDeltaCommit, - final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation, - HoodieWriteConfig cfg) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); - HoodieTable table = getHoodieTable(metaClient, cfg); - List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table); - fileSliceList.forEach(fileSlice -> { - Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId()); - if (opPair != null) { - assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant"); - assertTrue(fileSlice.getLogFiles().count() > 0, - "Expect atleast one log file to be present where the latest delta commit was written"); - assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present"); - } else { - assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0, - "Expect baseInstant to be less than or equal to latestDeltaCommit"); - } - }); - } - - private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants, - List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants) - throws Exception { - - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); - List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions(); - List<String> gotPendingCompactionInstants = - pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList()); - assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants); - - Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation = - CompactionUtils.getAllPendingCompactionOperations(metaClient); - - if (insertFirst) { - // Use first instant for inserting records - String firstInstant = deltaInstants.get(0); - deltaInstants = deltaInstants.subList(1, deltaInstants.size()); - JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); - client.startCommitWithTime(firstInstant); - JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant); - List<WriteStatus> statusList = statuses.collect(); - - if (!cfg.shouldAutoCommit()) { - client.commit(firstInstant, statuses); - } - assertNoWriteErrors(statusList); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); - HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); - List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); - assertTrue(dataFilesToRead.stream().findAny().isPresent(), - "should list the parquet files we wrote in the delta commit"); - validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg); - } - - int numRecords = records.size(); - for (String instantTime : deltaInstants) { - records = dataGen.generateUpdates(instantTime, numRecords); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); - createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); - validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg); - } - return records; - } - - private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); - HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); - metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); - HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() - .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get(); - assertTrue(instant.isInflight(), "Instant must be marked inflight"); - } - - private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) - throws IOException { - client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); - HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); - assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set"); - } - - private void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table, - HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { - scheduleCompaction(compactionInstantTime, client, cfg); - executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction); - } - - private void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table, - HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { - - client.compact(compactionInstantTime); - List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table); - assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty"); - assertFalse(fileSliceList.stream() - .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)), - "Verify all file-slices have base-instant same as compaction instant"); - assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()), - "Verify all file-slices have data-files"); - - if (hasDeltaCommitAfterPendingCompaction) { - assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0), - "Verify all file-slices have atleast one log-file"); - } else { - assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0), - "Verify all file-slices have no log-files"); - } - - // verify that there is a commit - table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg); - HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); - String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); - assertEquals(latestCompactionCommitTime, compactionInstantTime, - "Expect compaction instant time to be the latest commit time"); - assertEquals(expectedNumRecs, - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(), - "Must contain expected records"); - - } - - private List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, - HoodieWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) { - JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); - - client.startCommitWithTime(instantTime); - - JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime); - List<WriteStatus> statusList = statuses.collect(); - assertNoWriteErrors(statusList); - if (!cfg.shouldAutoCommit() && !skipCommit) { - client.commit(instantTime, statuses); - } - - Option<HoodieInstant> deltaCommit = - metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant(); - if (skipCommit && !cfg.shouldAutoCommit()) { - assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0, - "Delta commit should not be latest instant"); - } else { - assertTrue(deltaCommit.isPresent()); - assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant"); - } - return statusList; - } - - private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { - FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); - HoodieTableFileSystemView view = - getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); - return view.getLatestBaseFiles().collect(Collectors.toList()); - } - - private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) { - HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), - table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline()); - return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) - .flatMap(view::getLatestFileSlices).collect(Collectors.toList()); - } - - protected HoodieTableType getTableType() { - return HoodieTableType.MERGE_ON_READ; - } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java new file mode 100644 index 0000000..4cbb461 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestInlineCompaction extends CompactionTestBase { + + private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) { + return getConfigBuilder(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build()) + .build(); + } + + @Test + public void testCompactionIsNotScheduledEarly() throws Exception { + // Given: make two commits + HoodieWriteConfig cfg = getConfigForInlineCompaction(3); + try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) { + List<HoodieRecord> records = dataGen.generateInserts("000", 100); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + runNextDeltaCommits(writeClient, readClient, Arrays.asList("000", "001"), records, cfg, true, new ArrayList<>()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + + // Then: ensure no compaction is executedm since there are only 2 delta commits + assertEquals(2, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + } + } + + @Test + public void testSuccessfulCompaction() throws Exception { + // Given: make three commits + HoodieWriteConfig cfg = getConfigForInlineCompaction(3); + List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + + try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) { + List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); + + // third commit, that will trigger compaction + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + String finalInstant = HoodieActiveTimeline.createNewInstantTime(); + createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false); + + // Then: ensure the file slices are compacted as per policy + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction()); + } + } + + @Test + public void testCompactionRetryOnFailure() throws Exception { + // Given: two commits, schedule compaction and its failed/in-flight + HoodieWriteConfig cfg = getConfigBuilder(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); + List<String> instants = CollectionUtils.createImmutableList("000", "001"); + try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) { + List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); + // Schedule compaction 002, make it in-flight (simulates inline compaction failing) + scheduleCompaction("002", writeClient, cfg); + moveCompactionFromRequestedToInflight("002", cfg); + } + + // When: a third commit happens + HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2); + try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + createNextDeltaCommit("003", dataGen.generateUpdates("003", 100), writeClient, metaClient, inlineCfg, false); + } + + // Then: 1 delta commit is done, the failed compaction is retried + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + assertEquals("002", metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + } +}