This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5b7f18c13aa96084d5f625fd4e8087a2815b6294 Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Mon Nov 28 22:48:06 2022 -0500 [HUDI-5242] Do not fail Meta sync in Deltastreamer when inline table service fails (#7243) After the files are written, table services like clustering and compaction can fail. This causes the sync to the metaserver to not happen. This patch adds a config that when set to false, the deltastreamer will not fail and the sync to the metaserver will occur. A warning will be logged with the exception that occurred. To use this new behavior, set hoodie.fail.writes.on.inline.table.service.exception to false. Co-authored-by: Jonathan Vexler <=> --- .../apache/hudi/client/BaseHoodieWriteClient.java | 16 ++++- .../org/apache/hudi/config/HoodieWriteConfig.java | 15 +++++ .../TestHoodieClientOnCopyOnWriteStorage.java | 68 ++++++++++++++++++++-- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 50337d56d91..609f85e27fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -242,9 +242,21 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, } finally { this.txnManager.endTransaction(Option.of(inflightInstant)); } - // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period - runTableServicesInline(table, metadata, extraMetadata); + + // We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false + try { + // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period + runTableServicesInline(table, metadata, extraMetadata); + } catch (Exception e) { + if (config.isFailOnInlineTableServiceExceptionEnabled()) { + throw e; + } + LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage() + + ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false."); + } + emitCommitMetrics(instantTime, metadata, commitActionType); + // callback if needed. if (config.writeCommitCallbackOn()) { if (null == commitCallback) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 43bd4fb5d98..533bb5c47df 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -359,6 +359,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. " + "Controls whether or not, the write should be failed as well, if such archiving fails."); + public static final ConfigProperty<String> FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION = ConfigProperty + .key("hoodie.fail.writes.on.inline.table.service.exception") + .defaultValue("true") + .withDocumentation("Table services such as compaction and clustering can fail and prevent syncing to " + + "the metaclient. Set this to true to fail writes when table services fail"); + public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.initial_interval_ms") .defaultValue(2000L) @@ -1107,6 +1113,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE); } + public boolean isFailOnInlineTableServiceExceptionEnabled() { + return getBoolean(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION); + } + public int getMaxConsistencyChecks() { return getInt(MAX_CONSISTENCY_CHECKS); } @@ -2256,6 +2266,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withFailureOnInlineTableServiceException(boolean fail) { + writeConfig.setValue(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION, String.valueOf(fail)); + return this; + } + public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) { writeConfig.setValue(INSERT_PARALLELISM_VALUE, String.valueOf(insertShuffleParallelism)); writeConfig.setValue(UPSERT_PARALLELISM_VALUE, String.valueOf(upsertShuffleParallelism)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index fd8013c1505..aa584443162 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -83,6 +83,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCorruptedDataException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieUpsertException; @@ -1756,30 +1757,62 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { return allRecords.getLeft().getLeft(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) throws IOException { + try { + Properties properties = new Properties(); + properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", String.valueOf(shouldFail)); + properties.setProperty("hoodie.auto.commit", "false"); + properties.setProperty("hoodie.clustering.inline.max.commits", "1"); + properties.setProperty("hoodie.clustering.inline", "true"); + testInsertTwoBatches(true, "2015/03/16", properties, true); + assertFalse(shouldFail); + } catch (HoodieException e) { + assertEquals(CLUSTERING_FAILURE, e.getMessage()); + assertTrue(shouldFail); + } + } + private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException { return testInsertTwoBatches(populateMetaFields, "2015/03/16"); } + private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException { + return testInsertTwoBatches(populateMetaFields, partitionPath, new Properties(), false); + } + /** * This method returns following three items: * 1. List of all HoodieRecord written in the two batches of insert. * 2. Commit instants of the two batches. * 3. List of new file group ids that were written in the two batches. */ - private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException { + private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath, Properties props, + boolean failInlineClustering) throws IOException { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields, - populateMetaFields ? new Properties() : getPropertiesForKeyGen()); - SparkRDDWriteClient client = getHoodieWriteClient(config); + populateMetaFields ? props : getPropertiesForKeyGen()); + SparkRDDWriteClient client; + if (failInlineClustering) { + if (null != writeClient) { + writeClient.close(); + writeClient = null; + } + client = new WriteClientBrokenClustering(context, config); + } else { + client = getHoodieWriteClient(config); + } + dataGen = new HoodieTestDataGenerator(new String[] {partitionPath}); String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200); - List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields); + List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields, failInlineClustering); Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1); String commitTime2 = HoodieActiveTimeline.createNewInstantTime(); List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200); - List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields); + List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields, failInlineClustering); Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2); Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1); fileIdsUnion.addAll(fileIds2); @@ -2077,11 +2110,20 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException { + return writeAndVerifyBatch(client, inserts, commitTime, populateMetaFields, false); + } + + private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields, boolean autoCommitOff) throws IOException { client.startCommitWithTime(commitTime); JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2); - List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect(); + JavaRDD<WriteStatus> statusRDD = client.upsert(insertRecordsRDD1, commitTime); + if (autoCommitOff) { + client.commit(commitTime, statusRDD); + } + List<WriteStatus> statuses = statusRDD.collect(); assertNoWriteErrors(statuses); verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.getConfig()); + return statuses; } @@ -2757,4 +2799,18 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } } + public static class WriteClientBrokenClustering<T extends HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> { + + public WriteClientBrokenClustering(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + @Override + protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) { + throw new HoodieException(CLUSTERING_FAILURE); + } + + } + + public static String CLUSTERING_FAILURE = "CLUSTERING FAILURE"; }