This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5b7f18c13aa96084d5f625fd4e8087a2815b6294
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Mon Nov 28 22:48:06 2022 -0500

    [HUDI-5242] Do not fail Meta sync in Deltastreamer when inline table 
service fails (#7243)
    
    After the files are written, table services like clustering and compaction 
can fail. This causes the sync to the metaserver to not happen. This patch adds 
a config that when set to false, the deltastreamer will not fail and the sync 
to the metaserver will occur. A warning will be logged with the exception that 
occurred. To use this new behavior, set 
hoodie.fail.writes.on.inline.table.service.exception to false.
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 16 ++++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 15 +++++
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 68 ++++++++++++++++++++--
 3 files changed, 91 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 50337d56d91..609f85e27fe 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -242,9 +242,21 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     } finally {
       this.txnManager.endTransaction(Option.of(inflightInstant));
     }
-    // do this outside of lock since compaction, clustering can be time taking 
and we don't need a lock for the entire execution period
-    runTableServicesInline(table, metadata, extraMetadata);
+
+    // We don't want to fail the commit if 
hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if 
false
+    try {
+      // do this outside of lock since compaction, clustering can be time 
taking and we don't need a lock for the entire execution period
+      runTableServicesInline(table, metadata, extraMetadata);
+    } catch (Exception e) {
+      if (config.isFailOnInlineTableServiceExceptionEnabled()) {
+        throw e;
+      }
+      LOG.warn("Inline compaction or clustering failed with exception: " + 
e.getMessage()
+          + ". Moving further since 
\"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
+    }
+
     emitCommitMetrics(instantTime, metadata, commitActionType);
+
     // callback if needed.
     if (config.writeCommitCallbackOn()) {
       if (null == commitCallback) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 43bd4fb5d98..533bb5c47df 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -359,6 +359,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Timeline archiving removes older instants from the 
timeline, after each write operation, to minimize metadata overhead. "
           + "Controls whether or not, the write should be failed as well, if 
such archiving fails.");
 
+  public static final ConfigProperty<String> 
FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION = ConfigProperty
+      .key("hoodie.fail.writes.on.inline.table.service.exception")
+      .defaultValue("true")
+      .withDocumentation("Table services such as compaction and clustering can 
fail and prevent syncing to "
+          + "the metaclient. Set this to true to fail writes when table 
services fail");
+
   public static final ConfigProperty<Long> 
INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
       .key("hoodie.consistency.check.initial_interval_ms")
       .defaultValue(2000L)
@@ -1107,6 +1113,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE);
   }
 
+  public boolean isFailOnInlineTableServiceExceptionEnabled() {
+    return getBoolean(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION);
+  }
+
   public int getMaxConsistencyChecks() {
     return getInt(MAX_CONSISTENCY_CHECKS);
   }
@@ -2256,6 +2266,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public  Builder withFailureOnInlineTableServiceException(boolean fail) {
+      writeConfig.setValue(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION, 
String.valueOf(fail));
+      return this;
+    }
+
     public Builder withParallelism(int insertShuffleParallelism, int 
upsertShuffleParallelism) {
       writeConfig.setValue(INSERT_PARALLELISM_VALUE, 
String.valueOf(insertShuffleParallelism));
       writeConfig.setValue(UPSERT_PARALLELISM_VALUE, 
String.valueOf(upsertShuffleParallelism));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index fd8013c1505..aa584443162 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -83,6 +83,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieUpsertException;
@@ -1756,30 +1757,62 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     return allRecords.getLeft().getLeft();
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) 
throws IOException {
+    try {
+      Properties properties = new Properties();
+      
properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", 
String.valueOf(shouldFail));
+      properties.setProperty("hoodie.auto.commit", "false");
+      properties.setProperty("hoodie.clustering.inline.max.commits", "1");
+      properties.setProperty("hoodie.clustering.inline", "true");
+      testInsertTwoBatches(true, "2015/03/16", properties, true);
+      assertFalse(shouldFail);
+    } catch (HoodieException e) {
+      assertEquals(CLUSTERING_FAILURE, e.getMessage());
+      assertTrue(shouldFail);
+    }
+  }
+
   private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields) throws IOException {
     return testInsertTwoBatches(populateMetaFields, "2015/03/16");
   }
 
+  private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws 
IOException {
+    return testInsertTwoBatches(populateMetaFields, partitionPath, new 
Properties(), false);
+  }
+
   /**
    * This method returns following three items:
    * 1. List of all HoodieRecord written in the two batches of insert.
    * 2. Commit instants of the two batches.
    * 3. List of new file group ids that were written in the two batches.
    */
-  private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws 
IOException {
+  private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields, String partitionPath, 
Properties props,
+                                                                               
                     boolean failInlineClustering) throws IOException {
     // create config to not update small files.
     HoodieWriteConfig config = getSmallInsertWriteConfig(2000, 
TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
-        populateMetaFields ? new Properties() : getPropertiesForKeyGen());
-    SparkRDDWriteClient client = getHoodieWriteClient(config);
+        populateMetaFields ? props : getPropertiesForKeyGen());
+    SparkRDDWriteClient client;
+    if (failInlineClustering) {
+      if (null != writeClient) {
+        writeClient.close();
+        writeClient = null;
+      }
+      client = new WriteClientBrokenClustering(context, config);
+    } else {
+      client = getHoodieWriteClient(config);
+    }
+
     dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
     String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
     List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
-    List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, 
commitTime1, populateMetaFields);
+    List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, 
commitTime1, populateMetaFields, failInlineClustering);
     Set<HoodieFileGroupId> fileIds1 = 
getFileGroupIdsFromWriteStatus(statuses1);
 
     String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
     List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
-    List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, 
commitTime2, populateMetaFields);
+    List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, 
commitTime2, populateMetaFields, failInlineClustering);
     Set<HoodieFileGroupId> fileIds2 = 
getFileGroupIdsFromWriteStatus(statuses2);
     Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1);
     fileIdsUnion.addAll(fileIds2);
@@ -2077,11 +2110,20 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   }
 
   private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, 
List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) 
throws IOException {
+    return writeAndVerifyBatch(client, inserts, commitTime, 
populateMetaFields, false);
+  }
+
+  private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, 
List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields, 
boolean autoCommitOff) throws IOException {
     client.startCommitWithTime(commitTime);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
-    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime).collect();
+    JavaRDD<WriteStatus> statusRDD = client.upsert(insertRecordsRDD1, 
commitTime);
+    if (autoCommitOff) {
+      client.commit(commitTime, statusRDD);
+    }
+    List<WriteStatus> statuses = statusRDD.collect();
     assertNoWriteErrors(statuses);
     verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, 
client.getConfig());
+
     return statuses;
   }
 
@@ -2757,4 +2799,18 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     }
   }
 
+  public static class WriteClientBrokenClustering<T extends 
HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T>   {
+
+    public WriteClientBrokenClustering(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
+      super(context, clientConfig);
+    }
+
+    @Override
+    protected Option<String> inlineClustering(Option<Map<String, String>> 
extraMetadata) {
+      throw new HoodieException(CLUSTERING_FAILURE);
+    }
+
+  }
+
+  public static  String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
 }

Reply via email to