This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4af445447bb [HUDI-8804] clean and archival should also be in try-catch 
block (#12562)
4af445447bb is described below

commit 4af445447bb1d28d07db9e2c12ec0baa0eea10f8
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Fri Jan 3 06:57:27 2025 -0800

    [HUDI-8804] clean and archival should also be in try-catch block (#12562)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 33 ++++++++++----------
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 36 ++++++++++++++--------
 .../timeline/versioning/v1/TimelineArchiverV1.java |  6 ++--
 .../BaseHoodieCompactionPlanGenerator.java         | 25 +++++++--------
 .../hudi/utils/HoodieWriterClientTestHarness.java  | 10 +++---
 .../TestHoodieJavaClientOnCopyOnWriteStorage.java  | 30 ++++++++++++------
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 35 ++++++++++++++-------
 7 files changed, 105 insertions(+), 70 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 81256ddcbb9..81a13e308e3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -472,7 +472,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       }
     }
     clusteringTimer = metrics.getClusteringCtx();
-    LOG.info("Starting clustering at {}", clusteringInstant);
+    LOG.info("Starting clustering at {} for table {}", clusteringInstant, 
table.getConfig().getBasePath());
     HoodieWriteMetadata<T> writeMetadata = table.cluster(context, 
clusteringInstant);
     HoodieWriteMetadata<O> clusteringMetadata = 
convertToOutputMetadata(writeMetadata);
     // Validation has to be done after cloning. if not, it could result in 
referencing the write status twice which means clustering could get executed 
twice.
@@ -540,7 +540,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       // Update table's metadata (table)
       writeTableMetadata(table, clusteringInstant.requestedTime(), metadata);
 
-      LOG.info("Committing Clustering {}", clusteringCommitTime);
+      LOG.info("Committing Clustering {} for table {}", clusteringCommitTime, 
table.getConfig().getBasePath());
       LOG.debug("Clustering {} finished with result {}", clusteringCommitTime, 
metadata);
 
       ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(false, 
clusteringInstant,
@@ -559,7 +559,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
           metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.CLUSTERING_ACTION)
       );
     }
-    LOG.info("Clustering successfully on commit {}", clusteringCommitTime);
+    LOG.info("Clustering successfully on commit {} for table {}", 
clusteringCommitTime, table.getConfig().getBasePath());
   }
 
   protected void runTableServicesInline(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
@@ -635,7 +635,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
         tableServiceType.getAction(), instantTime));
     try {
       this.txnManager.beginTransaction(inflightInstant, Option.empty());
-      LOG.info("Scheduling table service {}", tableServiceType);
+      LOG.info("Scheduling table service {} for table {}", tableServiceType, 
config.getBasePath());
       return scheduleTableServiceInternal(instantTime, extraMetadata, 
tableServiceType);
     } finally {
       this.txnManager.endTransaction(inflightInstant);
@@ -656,25 +656,25 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
         LOG.info("Scheduling archiving is not supported. Skipping.");
         break;
       case CLUSTER:
-        LOG.info("Scheduling clustering at instant time: {}", instantTime);
+        LOG.info("Scheduling clustering at instant time: {} for table {}", 
instantTime, config.getBasePath());
         Option<HoodieClusteringPlan> clusteringPlan = table
             .scheduleClustering(context, instantTime, extraMetadata);
         option = clusteringPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
         break;
       case COMPACT:
-        LOG.info("Scheduling compaction at instant time: {}", instantTime);
+        LOG.info("Scheduling compaction at instant time: {} for table {}", 
instantTime, config.getBasePath());
         Option<HoodieCompactionPlan> compactionPlan = table
             .scheduleCompaction(context, instantTime, extraMetadata);
         option = compactionPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
         break;
       case LOG_COMPACT:
-        LOG.info("Scheduling log compaction at instant time: {}", instantTime);
+        LOG.info("Scheduling log compaction at instant time: {} for table {}", 
instantTime, config.getBasePath());
         Option<HoodieCompactionPlan> logCompactionPlan = table
             .scheduleLogCompaction(context, instantTime, extraMetadata);
         option = logCompactionPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
         break;
       case CLEAN:
-        LOG.info("Scheduling cleaning at instant time: {}", instantTime);
+        LOG.info("Scheduling cleaning at instant time: {} for table {}", 
instantTime, config.getBasePath());
         Option<HoodieCleanerPlan> cleanerPlan = table
             .scheduleCleaning(context, instantTime, extraMetadata);
         option = cleanerPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
@@ -744,7 +744,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     
table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants().forEach(instant
 -> {
       Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
       if (instantPlan.isPresent()) {
-        LOG.info("Running pending clustering at instant {}", 
instantPlan.get().getLeft());
+        LOG.info("Running pending clustering at instant {} for table {}", 
instantPlan.get().getLeft(), config.getBasePath());
         cluster(instant.requestedTime(), true);
       }
     });
@@ -787,7 +787,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
 
     HoodieTable table = createTable(config, storageConf);
     if (config.allowMultipleCleans() || 
!table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
 {
-      LOG.info("Cleaner started");
+      LOG.info("Cleaner started for table {}", config.getBasePath());
       // proceed only if multiple clean schedules are enabled or if there are 
no pending cleans.
       if (scheduleInline) {
         scheduleTableServiceInternal(cleanInstantTime, Option.empty(), 
TableServiceType.CLEAN);
@@ -805,9 +805,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     if (timerContext != null && metadata != null) {
       long durationMs = metrics.getDurationInMs(timerContext.stop());
       metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
-      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
-          + " Earliest Retained Instant :" + 
metadata.getEarliestCommitToRetain()
-          + " cleanerElapsedMs" + durationMs);
+      LOG.info("Cleaned {} files Earliest Retained Instant : {} 
cleanerElapsedMs {} for table {}",
+          metadata.getTotalFilesDeleted(), 
metadata.getEarliestCommitToRetain(), durationMs, config.getBasePath());
     }
     releaseResources(cleanInstantTime);
     return metadata;
@@ -1079,7 +1078,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
   @Deprecated
   public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String 
rollbackInstantTime,
                           boolean skipLocking, boolean skipVersionCheck) 
throws HoodieRollbackException {
-    LOG.info("Begin rollback of instant " + commitInstantTime);
+    LOG.info("Begin rollback of instant {} for table {}", commitInstantTime, 
config.getBasePath());
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
@@ -1088,8 +1087,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
           .findFirst());
       if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
         LOG.info(String.format("Scheduling Rollback at instant time : %s "
-                + "(exists in active timeline: %s), with rollback plan: %s",
-            rollbackInstantTime, commitInstantOpt.isPresent(), 
pendingRollbackInfo.isPresent()));
+                + "(exists in active timeline: %s), with rollback plan: %s for 
table %s",
+            rollbackInstantTime, commitInstantOpt.isPresent(), 
pendingRollbackInfo.isPresent(), config.getBasePath()));
         Option<HoodieRollbackPlan> rollbackPlanOption = 
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
             .orElseGet(() -> table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(),
                 false));
@@ -1114,7 +1113,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
           throw new HoodieRollbackException("Failed to rollback " + 
config.getBasePath() + " commits " + commitInstantTime);
         }
       } else {
-        LOG.warn("Cannot find instant " + commitInstantTime + " in the 
timeline, for rollback");
+        LOG.warn("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
         return false;
       }
     } catch (Exception e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d2a16595f77..c58181eab45 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -256,17 +256,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     // trigger clean and archival.
     // Each internal call should ensure to lock if required.
     mayBeCleanAndArchive(table);
-    // We don't want to fail the commit if 
hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if 
false
-    try {
-      // do this outside of lock since compaction, clustering can be time 
taking and we don't need a lock for the entire execution period
-      runTableServicesInline(table, metadata, extraMetadata);
-    } catch (Exception e) {
-      if (config.isFailOnInlineTableServiceExceptionEnabled()) {
-        throw e;
-      }
-      LOG.warn("Inline compaction or clustering failed with exception: " + 
e.getMessage()
-          + ". Moving further since 
\"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
-    }
+    runTableServicesInline(table, metadata, extraMetadata);
 
     emitCommitMetrics(instantTime, metadata, commitActionType);
 
@@ -579,11 +569,31 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @param table instance of {@link HoodieTable} of interest.
    */
   protected void mayBeCleanAndArchive(HoodieTable table) {
-    autoCleanOnCommit();
-    autoArchiveOnCommit(table);
+    try {
+      autoCleanOnCommit();
+      autoArchiveOnCommit(table);
+    } catch (Throwable t) {
+      LOG.error(String.format("Inline cleaning or clustering failed for {}", 
table.getConfig().getBasePath()), t);
+      throw t;
+    }
   }
 
   protected void runTableServicesInline(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    // We don't want to fail the commit if 
hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if 
false
+    try {
+      // do this outside of lock since compaction, clustering can be time 
taking and we don't need a lock for the entire execution period
+      runTableServicesInlineInternal(table, metadata, extraMetadata);
+    } catch (Throwable t) {
+      LOG.error(String.format("Inline compaction or clustering failed for 
table {}.", table.getConfig().getBasePath()), t);
+      // Throw if this is exception and the exception is configured to throw 
or if it is something else like Error.
+      if (config.isFailOnInlineTableServiceExceptionEnabled() || !(t 
instanceof Exception)) {
+        throw t;
+      }
+      LOG.warn("Inline compaction or clustering failed. Moving further since 
\"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.", t);
+    }
+  }
+
+  protected void runTableServicesInlineInternal(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
     tableServiceClient.runTableServicesInline(table, metadata, extraMetadata);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index 8108d3b3ed9..9672169e5b2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -143,12 +143,12 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
       List<HoodieInstant> instantsToArchive = getInstantsToArchive();
       if (!instantsToArchive.isEmpty()) {
         this.writer = openWriter(archiveFilePath.getParent());
-        LOG.info("Archiving instants {}", instantsToArchive);
+        LOG.info(String.format("Archiving instants {} for table {}", 
instantsToArchive, config.getBasePath()));
         archive(context, instantsToArchive);
-        LOG.info("Deleting archived instants {}", instantsToArchive);
+        LOG.info("Deleting archived instants {} for table {}", 
instantsToArchive, config.getBasePath());
         deleteArchivedInstants(instantsToArchive, context);
       } else {
-        LOG.info("No Instants to archive");
+        LOG.info("No Instants to archive for table {}", config.getBasePath());
       }
 
       return instantsToArchive.size();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 609c468d8bc..1884f4c5d40 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -89,15 +89,16 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
 
     // filter the partition paths if needed to reduce list status
     partitionPaths = filterPartitionPathsByStrategy(partitionPaths);
-    LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
-        writeConfig.getCompactionStrategy().getClass().getSimpleName(), 
partitionPaths.size(), allPartitionSize);
+    LOG.info("Strategy: {} matched {} partition paths from all {} partitions 
for table {}",
+        writeConfig.getCompactionStrategy().getClass().getSimpleName(), 
partitionPaths.size(), allPartitionSize,
+        hoodieTable.getConfig().getBasePath());
     if (partitionPaths.isEmpty()) {
       // In case no partitions could be picked, return no compaction plan
       return null;
     }
     // avoid logging all partitions in table by default
-    LOG.info("Looking for files to compact in {} partitions", 
partitionPaths.size());
-    LOG.debug("Partitions scanned for compaction: {}", partitionPaths);
+    LOG.info("Looking for files to compact in {} partitions for table {}", 
partitionPaths.size(), hoodieTable.getConfig().getBasePath());
+    LOG.debug("Partitions scanned for compaction: {} for table {}", 
partitionPaths, hoodieTable.getConfig().getBasePath());
     engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for 
files to compact: " + writeConfig.getTableName());
 
     SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
this.hoodieTable.getSliceView();
@@ -109,7 +110,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     // Exclude files in pending clustering from compaction.
     
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
 
-    // Exclude files in pending logcompaction.
+    // Exclude files in pending log compaction.
     if (filterLogCompactionOperations()) {
       
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
           .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
@@ -120,7 +121,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
         
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
             HoodieTimeline.ROLLBACK_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
         .filterCompletedInstants().lastInstant().get().requestedTime();
-    LOG.info("Last completed instant time " + lastCompletedInstantTime);
+    LOG.info("Last completed instant time {} for table {}", 
lastCompletedInstantTime, hoodieTable.getConfig().getBasePath());
     Option<InstantRange> instantRange = 
CompactHelpers.getInstance().getInstantRange(metaClient);
 
     List<HoodieCompactionOperation> operations = 
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
@@ -153,17 +154,17 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
         }), partitionPaths.size()).stream()
         
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
 
-    LOG.info("Total of {} compaction operations are retrieved", 
operations.size());
-    LOG.info("Total number of log files {}", totalLogFiles.value());
-    LOG.info("Total number of file slices {}", totalFileSlices.value());
+    LOG.info("Total of {} compaction operations are retrieved for table {}", 
operations.size(), hoodieTable.getConfig().getBasePath());
+    LOG.info("Total number of log files {} for table {}", 
totalLogFiles.value(), hoodieTable.getConfig().getBasePath());
+    LOG.info("Total number of file slices {} for table {}", 
totalFileSlices.value(), hoodieTable.getConfig().getBasePath());
 
     if (operations.isEmpty()) {
-      LOG.warn("No operations are retrieved for {}", metaClient.getBasePath());
+      LOG.warn("No operations are retrieved for {} for table {}", 
metaClient.getBasePath(), hoodieTable.getConfig().getBasePath());
       return null;
     }
 
     if (totalLogFiles.value() <= 0) {
-      LOG.warn("No log files are retrieved for {}", metaClient.getBasePath());
+      LOG.warn("No log files are retrieved for {} for table {}", 
metaClient.getBasePath(), hoodieTable.getConfig().getBasePath());
       return null;
     }
 
@@ -176,7 +177,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
             + "Please fix your strategy implementation. 
FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
             + ", Selected workload :" + compactionPlan);
     if (compactionPlan.getOperations().isEmpty()) {
-      LOG.warn("After filtering, Nothing to compact for {}", 
metaClient.getBasePath());
+      LOG.warn("After filtering, Nothing to compact for {} for table {}", 
metaClient.getBasePath(), hoodieTable.getConfig().getBasePath());
     }
     return compactionPlan;
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 0279d1c6a5c..9a6cee94aa0 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -947,19 +947,19 @@ public abstract class HoodieWriterClientTestHarness 
extends HoodieCommonTestHarn
     assertEquals(200, upserts);
   }
 
-  protected void testFailWritesOnInlineTableServiceExceptions(boolean 
shouldFail, Function createBrokenClusteringClientFn) throws IOException {
+  protected void testFailWritesOnInlineTableServiceThrowable(boolean 
shouldFailOnException, boolean actuallyFailed, Function 
createBrokenClusteringClientFn) throws IOException {
     try {
       Properties properties = new Properties();
-      
properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", 
String.valueOf(shouldFail));
+      
properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", 
String.valueOf(shouldFailOnException));
       properties.setProperty("hoodie.auto.commit", "false");
       properties.setProperty("hoodie.clustering.inline.max.commits", "1");
       properties.setProperty("hoodie.clustering.inline", "true");
       
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
       testInsertTwoBatches(true, "2015/03/16", properties, true, 
createBrokenClusteringClientFn);
-      assertFalse(shouldFail);
-    } catch (HoodieException e) {
+      assertFalse(actuallyFailed);
+    } catch (HoodieException | Error e) {
       assertEquals(CLUSTERING_FAILURE, e.getMessage());
-      assertTrue(shouldFail);
+      assertTrue(actuallyFailed);
     }
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index 9fda8f722ee..3072599bd8c 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -84,8 +84,9 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage extends 
HoodieJavaClientTe
 
   private static final Function<Object, Object> IDENTITY = Function.identity();
 
-  private final Function<HoodieWriteConfig, BaseHoodieWriteClient> 
createBrokenClusteringClient =
-      config -> new WriteClientBrokenClustering<>(context, config);
+  private Function<HoodieWriteConfig, BaseHoodieWriteClient> 
createBrokenClusteringClient(Throwable throwable) {
+    return config -> new WriteClientBrokenClustering<>(context, config, 
throwable);
+  }
 
   private final Function2<HoodieTable, HoodieTableMetaClient, 
HoodieWriteConfig> getHoodieTable =
       (metaClient, config) -> getHoodieTable(metaClient, config);
@@ -352,7 +353,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
         
.withClusteringPlanStrategyClass(JavaSizeBasedClusteringPlanStrategy.class.getName())
         
.withClusteringExecutionStrategyClass(JavaSortAndSizeExecutionStrategy.class.getName())
         .withInlineClustering(true).withInlineClusteringNumCommits(2).build();
-    testAndValidateClusteringOutputFiles(createBrokenClusteringClient, 
clusteringConfig, IDENTITY, IDENTITY);
+    testAndValidateClusteringOutputFiles(createBrokenClusteringClient(new 
HoodieException(CLUSTERING_FAILURE)), clusteringConfig, IDENTITY, IDENTITY);
   }
 
   @ParameterizedTest
@@ -362,13 +363,20 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
         
.withClusteringTargetPartitions(0).withAsyncClusteringMaxCommits(1).withInlineClustering(false).withScheduleInlineClustering(scheduleInlineClustering)
         
.withClusteringExecutionStrategyClass(JavaSortAndSizeExecutionStrategy.class.getName())
         
.withClusteringPlanStrategyClass(JavaSizeBasedClusteringPlanStrategy.class.getName()).build();
-    testInlineScheduleClustering(createBrokenClusteringClient, 
clusteringConfig, IDENTITY, IDENTITY);
+    testInlineScheduleClustering(createBrokenClusteringClient(new 
HoodieException(CLUSTERING_FAILURE)), clusteringConfig, IDENTITY, IDENTITY);
   }
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) 
throws IOException {
-    testFailWritesOnInlineTableServiceExceptions(shouldFail, 
createBrokenClusteringClient);
+    testFailWritesOnInlineTableServiceThrowable(shouldFail, shouldFail,
+        createBrokenClusteringClient(new HoodieException(CLUSTERING_FAILURE)));
+  }
+
+  @Test
+  public void testFailWritesOnInlineTableServiceErrors() throws IOException {
+    testFailWritesOnInlineTableServiceThrowable(false, true,
+        createBrokenClusteringClient(new 
OutOfMemoryError(CLUSTERING_FAILURE)));
   }
 
   /**
@@ -440,17 +448,21 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
   }
 
   public static class WriteClientBrokenClustering<T extends 
HoodieRecordPayload> extends org.apache.hudi.client.HoodieJavaWriteClient<T> {
+    private final Throwable throwable;
 
-    public WriteClientBrokenClustering(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
+    public WriteClientBrokenClustering(HoodieEngineContext context, 
HoodieWriteConfig clientConfig, Throwable throwable) {
       super(context, clientConfig);
+      this.throwable = throwable;
     }
 
     @Override
-    protected void runTableServicesInline(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    protected void runTableServicesInlineInternal(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
       if (config.inlineClusteringEnabled()) {
-        throw new HoodieException(CLUSTERING_FAILURE);
+        if (throwable instanceof Error) {
+          throw (Error) throwable;
+        }
+        throw (HoodieException) throwable;
       }
     }
-
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 965c9505c48..97eaf79521b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -206,8 +206,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
   private final Function<JavaRDD, List> rdd2List = 
AbstractJavaRDDLike::collect;
 
-  private final Function<HoodieWriteConfig, BaseHoodieWriteClient> 
createBrokenClusteringClient =
-      config -> new WriteClientBrokenClustering<>(context, config);
+  private Function<HoodieWriteConfig, BaseHoodieWriteClient> 
createBrokenClusteringClient(Throwable throwable) {
+    return config -> new WriteClientBrokenClustering<>(context, config, 
throwable);
+  }
 
   private final Function<HoodieWriteMetadata, 
HoodieWriteMetadata<List<WriteStatus>>> clusteringMetadataRdd2List =
       metadata -> 
metadata.clone(((JavaRDD)(metadata.getWriteStatuses())).collect());
@@ -1000,7 +1001,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
   @Test
   public void testAndValidateClusteringOutputFiles() throws IOException {
-    testAndValidateClusteringOutputFiles(createBrokenClusteringClient, 
createClusteringBuilder(true, 2).build(), list2Rdd, rdd2List);
+    testAndValidateClusteringOutputFiles(createBrokenClusteringClient(new 
HoodieException(CLUSTERING_FAILURE)), createClusteringBuilder(true, 2).build(), 
list2Rdd, rdd2List);
   }
 
   @Test
@@ -1032,7 +1033,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   public void testInlineScheduleClustering(boolean scheduleInlineClustering) 
throws IOException {
     HoodieClusteringConfig clusteringConfig = createClusteringBuilder(false, 1)
             
.withAsyncClusteringMaxCommits(1).withScheduleInlineClustering(scheduleInlineClustering).build();
-    testInlineScheduleClustering(createBrokenClusteringClient, 
clusteringConfig, list2Rdd, rdd2List);
+    testInlineScheduleClustering(createBrokenClusteringClient(new 
HoodieException(CLUSTERING_FAILURE)), clusteringConfig, list2Rdd, rdd2List);
   }
 
   @ParameterizedTest
@@ -1190,7 +1191,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig 
clusteringConfig, boolean populateMetaFields,
                                                      boolean 
completeClustering, boolean assertSameFileIds, String validatorClasses,
                                                      String 
sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws 
Exception {
-    Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
allRecords = testInsertTwoBatches(populateMetaFields, 
createBrokenClusteringClient);
+    Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
allRecords = testInsertTwoBatches(
+        populateMetaFields, createBrokenClusteringClient(new 
HoodieException(CLUSTERING_FAILURE)));
     testClustering(clusteringConfig, populateMetaFields, completeClustering, 
assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation,
             sqlQueryForSingleResultValidation, allRecords, 
clusteringMetadataRdd2List, createKeyGenerator);
     return allRecords.getLeft().getLeft();
@@ -1199,7 +1201,14 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) 
throws IOException {
-    testFailWritesOnInlineTableServiceExceptions(shouldFail, 
createBrokenClusteringClient);
+    testFailWritesOnInlineTableServiceThrowable(shouldFail, shouldFail, 
+        createBrokenClusteringClient(new HoodieException(CLUSTERING_FAILURE)));
+  }
+
+  @Test
+  public void testFailWritesOnInlineTableServiceErrors() throws IOException {
+    testFailWritesOnInlineTableServiceThrowable(false, true,
+        createBrokenClusteringClient(new 
OutOfMemoryError(CLUSTERING_FAILURE)));
   }
 
   /**
@@ -1660,18 +1669,22 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   }
 
   public static class WriteClientBrokenClustering<T extends 
HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {
+    private final Throwable throwable;
 
-    public WriteClientBrokenClustering(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
-      super(context, clientConfig);
+    public WriteClientBrokenClustering(HoodieEngineContext context, 
HoodieWriteConfig config, Throwable throwable) {
+      super(context, config);
+      this.throwable = throwable;
     }
 
     @Override
-    protected void runTableServicesInline(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    protected void runTableServicesInlineInternal(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
       if (config.inlineClusteringEnabled()) {
-        throw new HoodieException(CLUSTERING_FAILURE);
+        if (throwable instanceof Error) {
+          throw (Error) throwable;
+        }
+        throw (HoodieException) throwable;
       }
     }
-
   }
 
   /**

Reply via email to