This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 73e710de1332 feat(table-services): Emit archival metrics for 
monitoring and debugging (#18133)
73e710de1332 is described below

commit 73e710de1332d88a6b8f8d751d56d93ee129dbb6
Author: Nada <[email protected]>
AuthorDate: Fri Feb 27 13:45:34 2026 -0500

    feat(table-services): Emit archival metrics for monitoring and debugging 
(#18133)
    
    * Emit archival metrics for common OOM failure
    
    Summary:
    The following OOM failure is a common archival failure:
    
    ```
    2025-05-19T14:24:21-04:00 INFO [pool-18-thread-1] ApplicationMaster: 
Unregistering ApplicationMaster with FAILED (diag message: User class threw 
exception: java.lang.OutOfMemoryError
            at 
java.base/java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:125)
            at 
java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:119)
            at 
java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
            at 
java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
            at 
java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
            at 
java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
            at 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:133)
            at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:116)
            at 
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:175)
            at 
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlock(HoodieLogFormatWriter.java:152)
            at 
org.apache.hudi.client.HoodieTimelineArchiver.writeToFile(HoodieTimelineArchiver.java:692)
            at 
org.apache.hudi.client.HoodieTimelineArchiver.archive(HoodieTimelineArchiver.java:663)
            at 
org.apache.hudi.client.HoodieTimelineArchiver.archiveIfRequired(HoodieTimelineArchiver.java:179)
            at 
org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:827)
            at 
org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:963)
            at 
org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:973)
            at 
com.uber.hudi.tools.manager.HoodieOperationArchival.runArchival(HoodieOperationArchival.java:61)
            at 
com.uber.hudi.tools.manager.HoodieOperationArchival.execute(HoodieOperationArchival.java:49)
            at 
com.uber.hudi.tools.manager.HoodieManager.main(HoodieManager.java:54)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.base/java.lang.reflect.Method.invoke(Method.java:566)
            at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:804)
    )
    ```
    
    Added metrics to capture this failure scenario. Refactored the existing 
archival metrics to support this.
    
    Test Plan: Added unit test
    
    Reviewers: bkrishen, jingli, O955 Project Hoodie Project Reviewer: Add 
blocking reviewers, #hoodie_blocking_reviewers, ureview
    
    Reviewed By: bkrishen, jingli, O955 Project Hoodie Project Reviewer: Add 
blocking reviewers, #hoodie_blocking_reviewers
    
    Tags: #hudi_0.14, #has_java
    
    JIRA Issues: HUDI-6816
    
    Differential Revision: https://code.uberinternal.com/D17816129
    
    * feat: Emit archival metrics for monitoring and debugging
    
    Add detailed metrics collection during timeline archival to help monitor
    and debug archival operations. This includes:
    - OOM failure detection and tracking
    - General exception tracking with exception class names
    - Count of commits being archived (all commits and write commits)
    - Archival operation status (success/failure)
    
    The metrics are collected in both TimelineArchiverV1 and TimelineArchiverV2
    and emitted through HoodieMetrics for external monitoring systems.
    
    * refactor: Remove HoodieTimelineArchiver concrete implementation
    
    This concrete implementation class doesn't exist in upstream master and was
    incorrectly brought back during commit porting. The archiver functionality
    is correctly implemented in the versioned archiver classes:
    - TimelineArchiverV1 (for 0.x timeline)
    - TimelineArchiverV2 (for 1.x LSM timeline)
    
    The interface at client/timeline/HoodieTimelineArchiver.java is the correct
    abstraction, and the metrics improvements are properly implemented in both
    v1 and v2 archiver classes.
    
    * fix: Update imports and usage for refactored HoodieTimelineArchiver
    
    Fix compilation errors after HoodieTimelineArchiver was refactored:
    - Add missing imports for HoodieTimelineArchiver and TimelineArchivers in 
BaseHoodieTableServiceClient
    - Remove duplicate java.util.Map import in HoodieMetrics
    - Update test to use TimelineArchivers factory method instead of direct 
instantiation
    - Fix HoodieInstant constructor call to include required Comparator 
parameter
    
    * refactor: Consolidate archival commit metrics logic and add DELTA_COMMIT 
support
    
    - Move addArchivalCommitMetrics to ArchivalMetrics utility class for reuse
    - Add DELTA_COMMIT_ACTION to write commit metrics (was missing in V2)
    - Remove unused emitCleanFailure method from HoodieMetrics
    
    * test: Use Mockito to simulate OOM in archival test instead of creating 
500K files
    
    Replace heavy test setup that created 19 commits with 500K HoodieWriteStat
    objects each with a lightweight approach using Mockito to mock the
    LSMTimelineWriter and throw OOM on write(). This makes the test faster
    and more reliable since it doesn't depend on JVM memory settings.
    
    * feat: Add clean and rollback metrics to archival and add test for mixed 
action types
    
    - Add ARCHIVAL_NUM_CLEAN_COMMITS and ARCHIVAL_NUM_ROLLBACK_COMMITS metrics
    - Update addArchivalCommitMetrics to record counts for clean and rollback 
actions
    - Add test to verify archival metrics with mixed action types (commits, 
replace
      commits, cleans, rollbacks)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |   7 +-
 .../client/timeline/HoodieTimelineArchiver.java    |  10 ++
 .../timeline/versioning/v1/TimelineArchiverV1.java |  26 +++-
 .../timeline/versioning/v2/TimelineArchiverV2.java |  27 +++-
 .../apache/hudi/client/utils/ArchivalMetrics.java  |  75 +++++++++++
 .../org/apache/hudi/metrics/HoodieMetrics.java     |   8 ++
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 141 +++++++++++++++++++++
 7 files changed, 291 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index b505ee60c40e..cb1777ad2adf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -897,9 +897,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     }
     final Timer.Context timerContext = metrics.getArchiveCtx();
     int instantsToArchive = 0;
+    HoodieTimelineArchiver archiver = null;
     try {
       // We cannot have unbounded commit files. Archive commits if we have to 
archive.
-      HoodieTimelineArchiver archiver = 
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(), 
config, table);
+      archiver = 
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(), 
config, table);
       instantsToArchive = archiver.archiveIfRequired(context, true);
     } catch (IOException ioe) {
       throw new HoodieIOException("Failed to archive", ioe);
@@ -908,6 +909,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
         long durationMs = metrics.getDurationInMs(timerContext.stop());
         this.metrics.updateArchiveMetrics(durationMs, instantsToArchive);
       }
+      // Emit additional archival metrics (OOM tracking, failure tracking, 
etc.)
+      if (archiver != null) {
+        this.metrics.updateArchivalMetrics(archiver.getMetrics());
+      }
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
index b1c1295cec52..6bd839405dee 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * Archiver to bound the growth of files under .hoodie meta path.
@@ -37,4 +39,12 @@ public interface HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
    * Check if commits need to be archived. If yes, archive commits.
    */
   int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) 
throws IOException;
+
+  /**
+   * Returns metrics collected during archival.
+   * Keys are metric names, values are metric values.
+   */
+  default Map<String, Long> getMetrics() {
+    return Collections.emptyMap();
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index b78597927f29..e693a1c2e7fc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -22,6 +22,7 @@ package org.apache.hudi.client.timeline.versioning.v1;
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
 import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.ArchivalMetrics;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -96,6 +97,7 @@ public class TimelineArchiverV1<T extends HoodieAvroPayload, 
I, K, O> implements
   private final HoodieTable<T, I, K, O> table;
   private final HoodieTableMetaClient metaClient;
   private final TransactionManager txnManager;
+  private final Map<String, Long> metrics;
 
   public TimelineArchiverV1(HoodieWriteConfig config, HoodieTable<T, I, K, O> 
table) {
     this.config = config;
@@ -106,6 +108,7 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
     Pair<Integer, Integer> minAndMaxInstants = 
getMinAndMaxInstantsToKeep(table, metaClient);
     this.minInstantsToKeep = minAndMaxInstants.getLeft();
     this.maxInstantsToKeep = minAndMaxInstants.getRight();
+    this.metrics = new HashMap<>();
   }
 
   private Writer openWriter(StoragePath archivePath) {
@@ -141,17 +144,27 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
         txnManager.beginStateChange(Option.empty(), Option.empty());
       }
       List<HoodieInstant> instantsToArchive = getInstantsToArchive();
+      addArchivalCommitMetrics(instantsToArchive);
+      boolean success = true;
       if (!instantsToArchive.isEmpty()) {
         this.writer = openWriter(archiveFilePath.getParent());
         log.info("Archiving instants {} for table {}", instantsToArchive, 
config.getBasePath());
         archive(context, instantsToArchive);
         log.info("Deleting archived instants {} for table {}", 
instantsToArchive, config.getBasePath());
-        deleteArchivedInstants(instantsToArchive, context);
+        success = deleteArchivedInstants(instantsToArchive, context);
       } else {
         log.info("No Instants to archive for table {}", config.getBasePath());
       }
 
+      metrics.put(ArchivalMetrics.ARCHIVAL_STATUS, success ? 1L : -1L);
       return instantsToArchive.size();
+    } catch (OutOfMemoryError oom) {
+      metrics.put(ArchivalMetrics.ARCHIVAL_OOM_FAILURE, 1L);
+      throw oom;
+    } catch (Exception e) {
+      String failureMetricName = String.join(".", 
ArchivalMetrics.ARCHIVAL_FAILURE, e.getClass().getSimpleName());
+      metrics.put(failureMetricName, 1L);
+      throw e;
     } finally {
       close();
       if (acquireLock) {
@@ -160,6 +173,17 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
     }
   }
 
+  @Override
+  public Map<String, Long> getMetrics() {
+    return metrics;
+  }
+
+  private void addArchivalCommitMetrics(List<HoodieInstant> instantsToArchive) 
{
+    ArchivalMetrics.addArchivalCommitMetrics(
+        instantsToArchive.stream().filter(HoodieInstant::isCompleted),
+        metrics);
+  }
+
   /**
    * Keeping for downgrade from 1.x LSM archived timeline.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index 5242bf58b7a7..ebcd16e2361e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.timeline.versioning.v2;
 
 import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
 import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.ArchivalMetrics;
 import org.apache.hudi.common.NativeTableFormat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -52,6 +53,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,6 +80,7 @@ public class TimelineArchiverV2<T extends HoodieAvroPayload, 
I, K, O> implements
   private final TransactionManager txnManager;
 
   private final LSMTimelineWriter timelineWriter;
+  private final Map<String, Long> metrics;
 
   public TimelineArchiverV2(HoodieWriteConfig config, HoodieTable<T, I, K, O> 
table) {
     this.config = config;
@@ -88,6 +91,7 @@ public class TimelineArchiverV2<T extends HoodieAvroPayload, 
I, K, O> implements
     Pair<Integer, Integer> minAndMaxInstants = 
getMinAndMaxInstantsToKeep(table, metaClient);
     this.minInstantsToKeep = minAndMaxInstants.getLeft();
     this.maxInstantsToKeep = minAndMaxInstants.getRight();
+    this.metrics = new HashMap<>();
   }
 
   @Override
@@ -105,6 +109,8 @@ public class TimelineArchiverV2<T extends 
HoodieAvroPayload, I, K, O> implements
     try {
       // Sort again because the cleaning and rollback instants could break the 
sequence.
       List<ActiveAction> instantsToArchive = 
getInstantsToArchive().sorted().collect(Collectors.toList());
+      addArchivalCommitMetrics(instantsToArchive);
+      boolean success = true;
       if (!instantsToArchive.isEmpty()) {
         log.info("Archiving and deleting instants {}", instantsToArchive);
         Consumer<Exception> exceptionHandler = e -> {
@@ -114,7 +120,7 @@ public class TimelineArchiverV2<T extends 
HoodieAvroPayload, I, K, O> implements
         };
         this.timelineWriter.write(instantsToArchive, Option.of(action -> 
deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
         log.debug("Deleting archived instants");
-        deleteArchivedActions(instantsToArchive, context);
+        success = deleteArchivedActions(instantsToArchive, context);
         // triggers compaction and cleaning only after archiving action
         this.timelineWriter.compactAndClean(context);
         Supplier<List<HoodieInstant>> archivedInstants = () -> 
instantsToArchive.stream()
@@ -125,7 +131,15 @@ public class TimelineArchiverV2<T extends 
HoodieAvroPayload, I, K, O> implements
       } else {
         log.info("No Instants to archive");
       }
+      metrics.put(ArchivalMetrics.ARCHIVAL_STATUS, success ? 1L : -1L);
       return instantsToArchive.size();
+    } catch (OutOfMemoryError oom) {
+      metrics.put(ArchivalMetrics.ARCHIVAL_OOM_FAILURE, 1L);
+      throw oom;
+    } catch (Exception e) {
+      String failureMetricName = String.join(".", 
ArchivalMetrics.ARCHIVAL_FAILURE, e.getClass().getSimpleName());
+      metrics.put(failureMetricName, 1L);
+      throw e;
     } finally {
       if (acquireLock) {
         txnManager.endStateChange(Option.empty());
@@ -133,6 +147,17 @@ public class TimelineArchiverV2<T extends 
HoodieAvroPayload, I, K, O> implements
     }
   }
 
+  @Override
+  public Map<String, Long> getMetrics() {
+    return metrics;
+  }
+
+  private void addArchivalCommitMetrics(List<ActiveAction> instantsToArchive) {
+    ArchivalMetrics.addArchivalCommitMetrics(
+        instantsToArchive.stream().flatMap(action -> 
action.getCompletedInstants().stream()),
+        metrics);
+  }
+
   private List<HoodieInstant> 
getCleanAndRollbackInstantsToArchive(HoodieInstant 
latestCommitInstantToArchive) {
     HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
         
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, 
HoodieTimeline.ROLLBACK_ACTION))
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java
new file mode 100644
index 000000000000..1bc002fad08e
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Constants and utilities for archival metrics.
+ */
+public final class ArchivalMetrics {
+  public static final String ARCHIVAL_OOM_FAILURE = "archivalOutOfMemory";
+  public static final String ARCHIVAL_NUM_ALL_COMMITS = 
"archivalNumAllCommits";
+  public static final String ARCHIVAL_NUM_WRITE_COMMITS = 
"archivalNumWriteCommits";
+  public static final String ARCHIVAL_NUM_CLEAN_COMMITS = 
"archivalNumCleanCommits";
+  public static final String ARCHIVAL_NUM_ROLLBACK_COMMITS = 
"archivalNumRollbackCommits";
+  public static final String ARCHIVAL_FAILURE = "archivalFailure";
+  public static final String ARCHIVAL_STATUS = "archivalStatus";
+
+  private static final Set<String> WRITE_COMMIT_ACTIONS = 
CollectionUtils.createSet(
+      HoodieTimeline.COMMIT_ACTION,
+      HoodieTimeline.DELTA_COMMIT_ACTION,
+      HoodieTimeline.REPLACE_COMMIT_ACTION);
+
+  private ArchivalMetrics() {
+    // Private constructor to prevent instantiation
+  }
+
+  /**
+   * Adds archival commit metrics to the given metrics map based on the 
completed instants.
+   *
+   * @param completedInstants stream of completed instants to archive
+   * @param metrics map to populate with archival metrics
+   */
+  public static void addArchivalCommitMetrics(Stream<HoodieInstant> 
completedInstants, Map<String, Long> metrics) {
+    // Collect to list since we need to iterate multiple times
+    List<HoodieInstant> instantsList = 
completedInstants.collect(Collectors.toList());
+    metrics.put(ARCHIVAL_NUM_ALL_COMMITS, (long) instantsList.size());
+    long writeCommitCount = instantsList.stream()
+        .filter(instant -> WRITE_COMMIT_ACTIONS.contains(instant.getAction()))
+        .count();
+    metrics.put(ARCHIVAL_NUM_WRITE_COMMITS, writeCommitCount);
+    long cleanCommitCount = instantsList.stream()
+        .filter(instant -> 
HoodieTimeline.CLEAN_ACTION.equals(instant.getAction()))
+        .count();
+    metrics.put(ARCHIVAL_NUM_CLEAN_COMMITS, cleanCommitCount);
+    long rollbackCommitCount = instantsList.stream()
+        .filter(instant -> 
HoodieTimeline.ROLLBACK_ACTION.equals(instant.getAction()))
+        .count();
+    metrics.put(ARCHIVAL_NUM_ROLLBACK_COMMITS, rollbackCommitCount);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 1bd97211cea1..c94087b1b149 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -36,6 +36,7 @@ import com.codahale.metrics.Timer;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH;
@@ -361,6 +362,13 @@ public class HoodieMetrics {
     }
   }
 
+  public void updateArchivalMetrics(Map<String, Long> archivalMetrics) {
+    if (config.isMetricsOn()) {
+      log.info(String.format("Sending archival metrics %s", archivalMetrics));
+      archivalMetrics.forEach((metricName, metricValue) -> 
metrics.registerGauge(getMetricsName("archival", metricName), metricValue));
+    }
+  }
+
   public void updateFinalizeWriteMetrics(long durationInMs, long 
numFilesFinalized) {
     if (config.isMetricsOn()) {
       log.debug("Sending finalize write metrics ({}={}, {}={})", DURATION_STR, 
durationInMs,
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index a6c25b7e9096..c631ec00626c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -22,10 +22,12 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.timeline.TimelineArchivers;
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter;
 import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.client.utils.ArchivalMetrics;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -88,6 +90,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
@@ -138,7 +141,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 
 @Slf4j
 public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness {
@@ -2048,4 +2055,138 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       assertEquals(expectedInstant.getState(), actualInstant.getState());
     }
   }
+
+  @Test
+  public void testArchiveWithOOMOnLargeCommitFile() throws Exception {
+    // Initialize table with archival config
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 
3, 2);
+    writeConfig.setValue(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE, 
"20");
+
+    // Create commits that will be archived
+    for (int i = 1; i <= 10; i++) {
+      String commitTime = String.format("%08d", i);
+      testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, 
Collections.singletonList("p1"),
+          Collections.singletonList("p1"), 1);
+    }
+
+    // Create archiver
+    HoodieTable table = HoodieSparkTable.create(writeConfig, context, 
metaClient);
+    TimelineArchiverV2 archiver = (TimelineArchiverV2) 
TimelineArchivers.getInstance(
+        table.getMetaClient().getTimelineLayoutVersion(), writeConfig, table);
+
+    // Use reflection to inject a mock LSMTimelineWriter that throws OOM
+    LSMTimelineWriter mockWriter = mock(LSMTimelineWriter.class);
+    doThrow(new OutOfMemoryError("Simulated 
OOM")).when(mockWriter).write(any(), any(), any());
+
+    Field writerField = 
TimelineArchiverV2.class.getDeclaredField("timelineWriter");
+    writerField.setAccessible(true);
+    writerField.set(archiver, mockWriter);
+
+    // Verify that archival throws OOM
+    assertThrows(OutOfMemoryError.class, () -> 
archiver.archiveIfRequired(context));
+
+    // Verify that OOM metric is recorded
+    Map<String, Long> metrics = archiver.getMetrics();
+    assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_OOM_FAILURE));
+
+    // Verify that commit metrics were recorded before OOM
+    assertTrue(metrics.containsKey(ArchivalMetrics.ARCHIVAL_NUM_ALL_COMMITS));
+  }
+
+  @Test
+  public void testArchivalMetricsWithMixedActionTypes() throws Exception {
+    // Initialize table with archival config: min=2, max=4
+    // This means archival will trigger when we have > 4 write commits and 
will archive down to 2
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 
4, 2);
+
+    Map<String, Integer> cleanStats = new HashMap<>();
+    cleanStats.put("p1", 1);
+
+    // Create a mix of action types in a specific order:
+    // Timeline: C1, C2, C3, C4, CL5, CL6, RB7, RB8, RC9, RC10, C11, C12, C13, 
C14
+    // Where C=commit, CL=clean, RB=rollback, RC=replace_commit (cluster)
+
+    // Commits 1-4: regular write commits
+    for (int i = 1; i <= 4; i++) {
+      testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT,
+          i == 1 ? Collections.singletonList("p1") : Collections.emptyList(),
+          Collections.singletonList("p1"), 2);
+    }
+
+    // Commits 5-6: clean commits (will be archived along with commits before 
them)
+    testTable.doClean(String.format("%08d", 5), cleanStats, 
Collections.emptyMap());
+    testTable.doClean(String.format("%08d", 6), cleanStats, 
Collections.emptyMap());
+
+    // Commits 7-8: rollback commits
+    testTable.doWriteOperation(String.format("%08d", 7), 
WriteOperationType.UPSERT,
+        Collections.emptyList(), Collections.singletonList("p1"), 2);
+    testTable.doRollback(String.format("%08d", 7), String.format("%08d", 8));
+
+    // Commits 9-10: replace commits (clustering)
+    testTable.doCluster(String.format("%08d", 9), Collections.emptyMap(), 
Collections.singletonList("p1"), 2);
+    testTable.doCluster(String.format("%08d", 10), Collections.emptyMap(), 
Collections.singletonList("p1"), 2);
+
+    // Commits 11-14: more write commits to trigger archival
+    for (int i = 11; i <= 14; i++) {
+      testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT,
+          Collections.emptyList(), Collections.singletonList("p1"), 2);
+    }
+
+    // Get timeline before archival
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline beforeArchival = 
metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
+    List<HoodieInstant> instantsBeforeArchival = beforeArchival.getInstants();
+
+    // Create archiver and trigger archival
+    HoodieTable table = HoodieSparkTable.create(writeConfig, context, 
metaClient);
+    TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table);
+    int archivedCount = archiver.archiveIfRequired(context);
+
+    // Get timeline after archival
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline afterArchival = 
metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
+    List<HoodieInstant> instantsAfterArchival = afterArchival.getInstants();
+
+    // Calculate what was actually archived
+    Set<HoodieInstant> afterSet = new HashSet<>(instantsAfterArchival);
+    List<HoodieInstant> archivedInstants = instantsBeforeArchival.stream()
+        .filter(instant -> !afterSet.contains(instant))
+        .collect(Collectors.toList());
+
+    // Count archived instants by action type
+    long expectedWriteCommits = archivedInstants.stream()
+        .filter(i -> i.getAction().equals(COMMIT_ACTION)
+            || i.getAction().equals(DELTA_COMMIT_ACTION)
+            || i.getAction().equals(REPLACE_COMMIT_ACTION))
+        .count();
+    long expectedCleanCommits = archivedInstants.stream()
+        .filter(i -> i.getAction().equals(CLEAN_ACTION))
+        .count();
+    long expectedRollbackCommits = archivedInstants.stream()
+        .filter(i -> i.getAction().equals(ROLLBACK_ACTION))
+        .count();
+    long expectedTotal = archivedInstants.size();
+
+    // Verify some instants were archived
+    assertTrue(archivedCount > 0, "Expected some instants to be archived");
+
+    // Verify metrics match actual archived counts
+    Map<String, Long> metrics = archiver.getMetrics();
+
+    assertEquals(expectedTotal, 
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_ALL_COMMITS),
+        "Total archived commits metric should match");
+    assertEquals(expectedWriteCommits, 
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_WRITE_COMMITS),
+        "Write commits metric should match");
+    assertEquals(expectedCleanCommits, 
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_CLEAN_COMMITS),
+        "Clean commits metric should match");
+    assertEquals(expectedRollbackCommits, 
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_ROLLBACK_COMMITS),
+        "Rollback commits metric should match");
+
+    // Verify the sum of individual action types equals total
+    assertEquals(expectedTotal, expectedWriteCommits + expectedCleanCommits + 
expectedRollbackCommits,
+        "Sum of action types should equal total archived");
+
+    // Verify archival status is success
+    assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_STATUS), "Archival 
should succeed");
+  }
 }

Reply via email to