This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dbec0c3e6 [core] Support commit metrics (#1638)
dbec0c3e6 is described below

commit dbec0c3e606a1517dc868ee143c2b5e9fb796dd4
Author: GuojunLi <[email protected]>
AuthorDate: Fri Oct 20 11:15:21 2023 +0800

    [core] Support commit metrics (#1638)
    
    This closes #1638.
---
 .../apache/paimon/metrics/AbstractMetricGroup.java |  10 +
 .../java/org/apache/paimon/metrics/Metrics.java    |   7 +
 .../paimon/metrics/commit/CommitMetrics.java       | 137 ++++++++++
 .../apache/paimon/metrics/commit/CommitStats.java  | 206 +++++++++++++++
 .../paimon/metrics/groups/GenericMetricGroup.java  |   4 +-
 .../apache/paimon/operation/FileStoreCommit.java   |   4 +
 .../paimon/operation/FileStoreCommitImpl.java      | 292 +++++++++++++--------
 .../paimon/table/AbstractFileStoreTable.java       |   3 +-
 .../apache/paimon/table/sink/TableCommitImpl.java  |   9 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |  23 ++
 .../paimon/metrics/commit/CommitMetricsTest.java   | 286 ++++++++++++++++++++
 .../paimon/metrics/commit/CommitStatsTest.java     | 154 +++++++++++
 12 files changed, 1024 insertions(+), 111 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java 
b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
index 78f293a50..41980ff3e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
@@ -190,4 +190,14 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
     public final boolean isClosed() {
         return closed;
     }
+
+    @Override
+    public String toString() {
+        return "MetricGroup{"
+                + "groupName="
+                + getGroupName()
+                + ", metrics="
+                + String.join(",", metrics.keySet())
+                + '}';
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java 
b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
index 6ef28749b..379f3d6c6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.metrics;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
 
 /** Core of Paimon metrics system. */
 public class Metrics {
@@ -53,4 +54,10 @@ public class Metrics {
     public ConcurrentLinkedQueue<MetricGroup> getMetricGroups() {
         return metricGroups;
     }
+
+    public static String groupsInfo() {
+        return getInstance().getMetricGroups().stream()
+                .map(Object::toString)
+                .collect(Collectors.joining(", ", "[", "]"));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java 
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
new file mode 100644
index 000000000..a216a94e2
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.metrics.AbstractMetricGroup;
+import org.apache.paimon.metrics.DescriptiveStatisticsHistogram;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.groups.GenericMetricGroup;
+
+/** Metrics to measure a commit. */
+public class CommitMetrics {
+    private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
+    protected static final String GROUP_NAME = "commit";
+
+    private final AbstractMetricGroup genericMetricGroup;
+
+    public CommitMetrics(String tableName) {
+        this.genericMetricGroup =
+                GenericMetricGroup.createGenericMetricGroup(tableName, 
GROUP_NAME);
+        registerGenericCommitMetrics();
+    }
+
+    @VisibleForTesting
+    public AbstractMetricGroup getMetricGroup() {
+        return genericMetricGroup;
+    }
+
+    private final Histogram durationHistogram =
+            new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE);
+
+    private CommitStats latestCommit;
+
+    @VisibleForTesting static final String LAST_COMMIT_DURATION = 
"lastCommitDuration";
+    @VisibleForTesting static final String COMMIT_DURATION = "commitDuration";
+    @VisibleForTesting static final String LAST_COMMIT_ATTEMPTS = 
"lastCommitAttempts";
+    @VisibleForTesting static final String LAST_TABLE_FILES_ADDED = 
"lastTableFilesAdded";
+    @VisibleForTesting static final String LAST_TABLE_FILES_DELETED = 
"lastTableFilesDeleted";
+    @VisibleForTesting static final String LAST_TABLE_FILES_APPENDED = 
"lastTableFilesAppended";
+
+    @VisibleForTesting
+    static final String LAST_TABLE_FILES_COMMIT_COMPACTED = 
"lastTableFilesCommitCompacted";
+
+    @VisibleForTesting
+    static final String LAST_CHANGELOG_FILES_APPENDED = 
"lastChangelogFilesAppended";
+
+    @VisibleForTesting
+    static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED = 
"lastChangelogFileCommitCompacted";
+
+    @VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS = 
"lastGeneratedSnapshots";
+    @VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED = 
"lastDeltaRecordsAppended";
+
+    @VisibleForTesting
+    static final String LAST_CHANGELOG_RECORDS_APPENDED = 
"lastChangelogRecordsAppended";
+
+    @VisibleForTesting
+    static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED = 
"lastDeltaRecordsCommitCompacted";
+
+    @VisibleForTesting
+    static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED =
+            "lastChangelogRecordsCommitCompacted";
+
+    @VisibleForTesting static final String LAST_PARTITIONS_WRITTEN = 
"lastPartitionsWritten";
+    @VisibleForTesting static final String LAST_BUCKETS_WRITTEN = 
"lastBucketsWritten";
+
+    private void registerGenericCommitMetrics() {
+        genericMetricGroup.gauge(
+                LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : 
latestCommit.getDuration());
+        genericMetricGroup.gauge(
+                LAST_COMMIT_ATTEMPTS, () -> latestCommit == null ? 0L : 
latestCommit.getAttempts());
+        genericMetricGroup.gauge(
+                LAST_GENERATED_SNAPSHOTS,
+                () -> latestCommit == null ? 0L : 
latestCommit.getGeneratedSnapshots());
+        genericMetricGroup.gauge(
+                LAST_PARTITIONS_WRITTEN,
+                () -> latestCommit == null ? 0L : 
latestCommit.getNumPartitionsWritten());
+        genericMetricGroup.gauge(
+                LAST_BUCKETS_WRITTEN,
+                () -> latestCommit == null ? 0L : 
latestCommit.getNumBucketsWritten());
+        genericMetricGroup.histogram(COMMIT_DURATION, durationHistogram);
+        genericMetricGroup.gauge(
+                LAST_TABLE_FILES_ADDED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getTableFilesAdded());
+        genericMetricGroup.gauge(
+                LAST_TABLE_FILES_DELETED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getTableFilesDeleted());
+        genericMetricGroup.gauge(
+                LAST_TABLE_FILES_APPENDED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getTableFilesAppended());
+        genericMetricGroup.gauge(
+                LAST_TABLE_FILES_COMMIT_COMPACTED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getTableFilesCompacted());
+        genericMetricGroup.gauge(
+                LAST_CHANGELOG_FILES_APPENDED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getChangelogFilesAppended());
+        genericMetricGroup.gauge(
+                LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getChangelogFilesCompacted());
+        genericMetricGroup.gauge(
+                LAST_DELTA_RECORDS_APPENDED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getDeltaRecordsAppended());
+        genericMetricGroup.gauge(
+                LAST_CHANGELOG_RECORDS_APPENDED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getChangelogRecordsAppended());
+        genericMetricGroup.gauge(
+                LAST_DELTA_RECORDS_COMMIT_COMPACTED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getDeltaRecordsCompacted());
+        genericMetricGroup.gauge(
+                LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED,
+                () -> latestCommit == null ? 0L : 
latestCommit.getChangelogRecordsCompacted());
+    }
+
+    public void reportCommit(CommitStats commitStats) {
+        latestCommit = commitStats;
+        durationHistogram.update(commitStats.getDuration());
+    }
+
+    public void close() {
+        this.genericMetricGroup.close();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java 
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
new file mode 100644
index 000000000..0a2772f63
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Statistics for a commit. */
+public class CommitStats {
+    private final long duration;
+    private final int attempts;
+    private final long tableFilesAdded;
+    private final long tableFilesAppended;
+    private final long tableFilesDeleted;
+    private final long changelogFilesAppended;
+    private final long changelogFilesCompacted;
+    private final long changelogRecordsCompacted;
+
+    private final long deltaRecordsCompacted;
+    private final long changelogRecordsAppended;
+    private final long deltaRecordsAppended;
+    private final long tableFilesCompacted;
+    private final long generatedSnapshots;
+    private final long numPartitionsWritten;
+    private final long numBucketsWritten;
+
+    public CommitStats(
+            List<ManifestEntry> appendTableFiles,
+            List<ManifestEntry> appendChangelogFiles,
+            List<ManifestEntry> compactTableFiles,
+            List<ManifestEntry> compactChangelogFiles,
+            long commitDuration,
+            int generatedSnapshots,
+            int attempts) {
+        List<ManifestEntry> addedTableFiles = new 
ArrayList<>(appendTableFiles);
+        addedTableFiles.addAll(
+                compactTableFiles.stream()
+                        .filter(f -> FileKind.ADD.equals(f.kind()))
+                        .collect(Collectors.toList()));
+        List<ManifestEntry> deletedTableFiles =
+                compactTableFiles.stream()
+                        .filter(f -> FileKind.DELETE.equals(f.kind()))
+                        .collect(Collectors.toList());
+
+        this.tableFilesAdded = addedTableFiles.size();
+        this.tableFilesAppended = appendTableFiles.size();
+        this.tableFilesDeleted = deletedTableFiles.size();
+        this.tableFilesCompacted = compactTableFiles.size();
+        this.changelogFilesAppended = appendChangelogFiles.size();
+        this.changelogFilesCompacted = compactChangelogFiles.size();
+        this.numPartitionsWritten = numChangedPartitions(appendTableFiles, 
compactTableFiles);
+        this.numBucketsWritten = numChangedBuckets(appendTableFiles, 
compactTableFiles);
+        this.changelogRecordsCompacted = getRowCounts(compactChangelogFiles);
+        this.deltaRecordsCompacted = getRowCounts(compactTableFiles);
+        this.changelogRecordsAppended = getRowCounts(appendChangelogFiles);
+        this.deltaRecordsAppended = getRowCounts(appendTableFiles);
+        this.duration = commitDuration;
+        this.generatedSnapshots = generatedSnapshots;
+        this.attempts = attempts;
+    }
+
+    @VisibleForTesting
+    protected static long numChangedPartitions(List<ManifestEntry>... changes) 
{
+        return Arrays.stream(changes)
+                .flatMap(Collection::stream)
+                .map(ManifestEntry::partition)
+                .distinct()
+                .count();
+    }
+
+    @VisibleForTesting
+    protected static long numChangedBuckets(List<ManifestEntry>... changes) {
+        return 
changedPartBuckets(changes).values().stream().mapToLong(Set::size).sum();
+    }
+
+    @VisibleForTesting
+    protected static List<BinaryRow> changedPartitions(List<ManifestEntry>... 
changes) {
+        return Arrays.stream(changes)
+                .flatMap(Collection::stream)
+                .map(ManifestEntry::partition)
+                .distinct()
+                .collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    protected static Map<BinaryRow, Set<Integer>> changedPartBuckets(
+            List<ManifestEntry>... changes) {
+        Map<BinaryRow, Set<Integer>> changedPartBuckets = new 
LinkedHashMap<>();
+        Arrays.stream(changes)
+                .flatMap(Collection::stream)
+                .forEach(
+                        entry ->
+                                changedPartBuckets
+                                        .computeIfAbsent(
+                                                entry.partition(), k -> new 
LinkedHashSet<>())
+                                        .add(entry.bucket()));
+        return changedPartBuckets;
+    }
+
+    private long getRowCounts(List<ManifestEntry> files) {
+        return files.stream().mapToLong(file -> file.file().rowCount()).sum();
+    }
+
+    @VisibleForTesting
+    protected long getTableFilesAdded() {
+        return tableFilesAdded;
+    }
+
+    @VisibleForTesting
+    protected long getTableFilesDeleted() {
+        return tableFilesDeleted;
+    }
+
+    @VisibleForTesting
+    protected long getTableFilesAppended() {
+        return tableFilesAppended;
+    }
+
+    @VisibleForTesting
+    protected long getTableFilesCompacted() {
+        return tableFilesCompacted;
+    }
+
+    @VisibleForTesting
+    protected long getChangelogFilesAppended() {
+        return changelogFilesAppended;
+    }
+
+    @VisibleForTesting
+    protected long getChangelogFilesCompacted() {
+        return changelogFilesCompacted;
+    }
+
+    @VisibleForTesting
+    protected long getGeneratedSnapshots() {
+        return generatedSnapshots;
+    }
+
+    @VisibleForTesting
+    protected long getDeltaRecordsAppended() {
+        return deltaRecordsAppended;
+    }
+
+    @VisibleForTesting
+    protected long getChangelogRecordsAppended() {
+        return changelogRecordsAppended;
+    }
+
+    @VisibleForTesting
+    protected long getDeltaRecordsCompacted() {
+        return deltaRecordsCompacted;
+    }
+
+    @VisibleForTesting
+    protected long getChangelogRecordsCompacted() {
+        return changelogRecordsCompacted;
+    }
+
+    @VisibleForTesting
+    protected long getNumPartitionsWritten() {
+        return numPartitionsWritten;
+    }
+
+    @VisibleForTesting
+    protected long getNumBucketsWritten() {
+        return numBucketsWritten;
+    }
+
+    @VisibleForTesting
+    protected long getDuration() {
+        return duration;
+    }
+
+    @VisibleForTesting
+    protected int getAttempts() {
+        return attempts;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
 
b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
index df927aa35..f1d5c3157 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
@@ -34,9 +34,9 @@ public class GenericMetricGroup extends AbstractMetricGroup {
     }
 
     public static GenericMetricGroup createGenericMetricGroup(
-            final String table, final String groupName) {
+            final String tableName, final String groupName) {
         Map<String, String> tags = new HashMap<>();
-        tags.put("table", table);
+        tags.put("table", tableName);
         return new GenericMetricGroup(tags, groupName);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index a585d6812..31d22c37e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.metrics.commit.CommitMetrics;
 import org.apache.paimon.table.sink.CommitMessage;
 
 import java.util.List;
@@ -78,4 +79,7 @@ public interface FileStoreCommit {
 
     /** Abort an unsuccessful commit. The data files will be deleted. */
     void abort(List<CommitMessage> commitMessages);
+
+    /** With metrics to measure commits. */
+    FileStoreCommit withMetrics(CommitMetrics metrics);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a40964975..77c22687c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -34,6 +34,8 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.metrics.commit.CommitMetrics;
+import org.apache.paimon.metrics.commit.CommitStats;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -112,6 +114,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     @Nullable private Lock lock;
     private boolean ignoreEmptyCommit;
 
+    private CommitMetrics commitMetrics;
+
     public FileStoreCommitImpl(
             FileIO fileIO,
             SchemaManager schemaManager,
@@ -149,6 +153,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
         this.lock = null;
         this.ignoreEmptyCommit = true;
+        this.commitMetrics = null;
     }
 
     @Override
@@ -192,6 +197,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             LOG.debug("Ready to commit\n" + committable.toString());
         }
 
+        long started = System.nanoTime();
+        int generatedSnapshot = 0;
+        int attempts = 0;
         Snapshot latestSnapshot = null;
         Long safeLatestSnapshotId = null;
         List<ManifestEntry> baseEntries = new ArrayList<>();
@@ -208,65 +216,104 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 compactTableFiles,
                 compactChangelog,
                 appendIndexFiles);
+        try {
+            if (!ignoreEmptyCommit
+                    || !appendTableFiles.isEmpty()
+                    || !appendChangelog.isEmpty()
+                    || !appendIndexFiles.isEmpty()) {
+                // Optimization for common path.
+                // Step 1:
+                // Read manifest entries from changed partitions here and 
check for conflicts.
+                // If there are no other jobs committing at the same time,
+                // we can skip conflict checking in tryCommit method.
+                // This optimization is mainly used to decrease the number of 
times we read from
+                // files.
+                latestSnapshot = snapshotManager.latestSnapshot();
+                if (latestSnapshot != null) {
+                    // it is possible that some partitions only have compact 
changes,
+                    // so we need to contain all changes
+                    baseEntries.addAll(
+                            readAllEntriesFromChangedPartitions(
+                                    latestSnapshot, appendTableFiles, 
compactTableFiles));
+                    noConflictsOrFail(latestSnapshot.commitUser(), 
baseEntries, appendTableFiles);
+                    safeLatestSnapshotId = latestSnapshot.id();
+                }
 
-        if (!ignoreEmptyCommit
-                || !appendTableFiles.isEmpty()
-                || !appendChangelog.isEmpty()
-                || !appendIndexFiles.isEmpty()) {
-            // Optimization for common path.
-            // Step 1:
-            // Read manifest entries from changed partitions here and check 
for conflicts.
-            // If there are no other jobs committing at the same time,
-            // we can skip conflict checking in tryCommit method.
-            // This optimization is mainly used to decrease the number of 
times we read from files.
-            latestSnapshot = snapshotManager.latestSnapshot();
-            if (latestSnapshot != null) {
-                // it is possible that some partitions only have compact 
changes,
-                // so we need to contain all changes
-                baseEntries.addAll(
-                        readAllEntriesFromChangedPartitions(
-                                latestSnapshot, appendTableFiles, 
compactTableFiles));
-                noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, 
appendTableFiles);
-                safeLatestSnapshotId = latestSnapshot.id();
+                attempts +=
+                        tryCommit(
+                                appendTableFiles,
+                                appendChangelog,
+                                appendIndexFiles,
+                                committable.identifier(),
+                                committable.watermark(),
+                                committable.logOffsets(),
+                                Snapshot.CommitKind.APPEND,
+                                safeLatestSnapshotId);
+                generatedSnapshot += 1;
             }
 
-            tryCommit(
-                    appendTableFiles,
-                    appendChangelog,
-                    appendIndexFiles,
-                    committable.identifier(),
-                    committable.watermark(),
-                    committable.logOffsets(),
-                    Snapshot.CommitKind.APPEND,
-                    safeLatestSnapshotId);
-        }
+            if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
+                // Optimization for common path.
+                // Step 2:
+                // Add appendChanges to the manifest entries read above and 
check for conflicts.
+                // If there are no other jobs committing at the same time,
+                // we can skip conflict checking in tryCommit method.
+                // This optimization is mainly used to decrease the number of 
times we read from
+                // files.
+                if (safeLatestSnapshotId != null) {
+                    baseEntries.addAll(appendTableFiles);
+                    noConflictsOrFail(latestSnapshot.commitUser(), 
baseEntries, compactTableFiles);
+                    // assume this compact commit follows just after the 
append commit created above
+                    safeLatestSnapshotId += 1;
+                }
 
-        if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
-            // Optimization for common path.
-            // Step 2:
-            // Add appendChanges to the manifest entries read above and check 
for conflicts.
-            // If there are no other jobs committing at the same time,
-            // we can skip conflict checking in tryCommit method.
-            // This optimization is mainly used to decrease the number of 
times we read from files.
-            if (safeLatestSnapshotId != null) {
-                baseEntries.addAll(appendTableFiles);
-                noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, 
compactTableFiles);
-                // assume this compact commit follows just after the append 
commit created above
-                safeLatestSnapshotId += 1;
+                attempts +=
+                        tryCommit(
+                                compactTableFiles,
+                                compactChangelog,
+                                Collections.emptyList(),
+                                committable.identifier(),
+                                committable.watermark(),
+                                committable.logOffsets(),
+                                Snapshot.CommitKind.COMPACT,
+                                safeLatestSnapshotId);
+                generatedSnapshot += 1;
+            }
+        } finally {
+            long commitDuration = (System.nanoTime() - started) / 1_000_000;
+            if (this.commitMetrics != null) {
+                reportCommit(
+                        appendTableFiles,
+                        appendChangelog,
+                        compactTableFiles,
+                        compactChangelog,
+                        commitDuration,
+                        generatedSnapshot,
+                        attempts);
             }
-
-            tryCommit(
-                    compactTableFiles,
-                    compactChangelog,
-                    Collections.emptyList(),
-                    committable.identifier(),
-                    committable.watermark(),
-                    committable.logOffsets(),
-                    Snapshot.CommitKind.COMPACT,
-                    safeLatestSnapshotId);
         }
     }
 
+    private void reportCommit(
+            List<ManifestEntry> appendTableFiles,
+            List<ManifestEntry> appendChangelogFiles,
+            List<ManifestEntry> compactTableFiles,
+            List<ManifestEntry> compactChangelogFiles,
+            long commitDuration,
+            int generatedSnapshots,
+            int attempts) {
+        CommitStats commitStats =
+                new CommitStats(
+                        appendTableFiles,
+                        appendChangelogFiles,
+                        compactTableFiles,
+                        compactChangelogFiles,
+                        commitDuration,
+                        generatedSnapshots,
+                        attempts);
+        commitMetrics.reportCommit(commitStats);
+    }
+
     @Override
     public void overwrite(
             Map<String, String> partition,
@@ -280,6 +327,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     properties);
         }
 
+        long started = System.nanoTime();
+        int generatedSnapshot = 0;
+        int attempts = 0;
         List<ManifestEntry> appendTableFiles = new ArrayList<>();
         List<ManifestEntry> appendChangelog = new ArrayList<>();
         List<ManifestEntry> compactTableFiles = new ArrayList<>();
@@ -309,65 +359,83 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             LOG.warn(warnMessage.toString());
         }
 
-        boolean skipOverwrite = false;
-        // partition filter is built from static or dynamic partition 
according to properties
-        Predicate partitionFilter = null;
-        if (dynamicPartitionOverwrite) {
-            if (appendTableFiles.isEmpty()) {
-                // in dynamic mode, if there is no changes to commit, no data 
will be deleted
-                skipOverwrite = true;
+        try {
+            boolean skipOverwrite = false;
+            // partition filter is built from static or dynamic partition 
according to properties
+            Predicate partitionFilter = null;
+            if (dynamicPartitionOverwrite) {
+                if (appendTableFiles.isEmpty()) {
+                    // in dynamic mode, if there is no changes to commit, no 
data will be deleted
+                    skipOverwrite = true;
+                } else {
+                    partitionFilter =
+                            appendTableFiles.stream()
+                                    .map(ManifestEntry::partition)
+                                    .distinct()
+                                    // partition filter is built from new 
data's partitions
+                                    .map(p -> 
PredicateBuilder.equalPartition(p, partitionType))
+                                    .reduce(PredicateBuilder::or)
+                                    .orElseThrow(
+                                            () ->
+                                                    new RuntimeException(
+                                                            "Failed to get 
dynamic partition filter. This is unexpected."));
+                }
             } else {
-                partitionFilter =
-                        appendTableFiles.stream()
-                                .map(ManifestEntry::partition)
-                                .distinct()
-                                // partition filter is built from new data's 
partitions
-                                .map(p -> PredicateBuilder.equalPartition(p, 
partitionType))
-                                .reduce(PredicateBuilder::or)
-                                .orElseThrow(
-                                        () ->
-                                                new RuntimeException(
-                                                        "Failed to get dynamic 
partition filter. This is unexpected."));
-            }
-        } else {
-            partitionFilter = PredicateBuilder.partition(partition, 
partitionType);
-            // sanity check, all changes must be done within the given 
partition
-            if (partitionFilter != null) {
-                for (ManifestEntry entry : appendTableFiles) {
-                    if (!partitionFilter.test(
-                            
partitionObjectConverter.convert(entry.partition()))) {
-                        throw new IllegalArgumentException(
-                                "Trying to overwrite partition "
-                                        + partition
-                                        + ", but the changes in "
-                                        + 
pathFactory.getPartitionString(entry.partition())
-                                        + " does not belong to this 
partition");
+                partitionFilter = PredicateBuilder.partition(partition, 
partitionType);
+                // sanity check, all changes must be done within the given 
partition
+                if (partitionFilter != null) {
+                    for (ManifestEntry entry : appendTableFiles) {
+                        if (!partitionFilter.test(
+                                
partitionObjectConverter.convert(entry.partition()))) {
+                            throw new IllegalArgumentException(
+                                    "Trying to overwrite partition "
+                                            + partition
+                                            + ", but the changes in "
+                                            + 
pathFactory.getPartitionString(entry.partition())
+                                            + " does not belong to this 
partition");
+                        }
                     }
                 }
             }
-        }
 
-        // overwrite new files
-        if (!skipOverwrite) {
-            tryOverwrite(
-                    partitionFilter,
-                    appendTableFiles,
-                    appendIndexFiles,
-                    committable.identifier(),
-                    committable.watermark(),
-                    committable.logOffsets());
-        }
+            // overwrite new files
+            if (!skipOverwrite) {
+                attempts +=
+                        tryOverwrite(
+                                partitionFilter,
+                                appendTableFiles,
+                                appendIndexFiles,
+                                committable.identifier(),
+                                committable.watermark(),
+                                committable.logOffsets());
+                generatedSnapshot += 1;
+            }
 
-        if (!compactTableFiles.isEmpty()) {
-            tryCommit(
-                    compactTableFiles,
-                    Collections.emptyList(),
-                    Collections.emptyList(),
-                    committable.identifier(),
-                    committable.watermark(),
-                    committable.logOffsets(),
-                    Snapshot.CommitKind.COMPACT,
-                    null);
+            if (!compactTableFiles.isEmpty()) {
+                attempts +=
+                        tryCommit(
+                                compactTableFiles,
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                committable.identifier(),
+                                committable.watermark(),
+                                committable.logOffsets(),
+                                Snapshot.CommitKind.COMPACT,
+                                null);
+                generatedSnapshot += 1;
+            }
+        } finally {
+            long commitDuration = (System.nanoTime() - started) / 1_000_000;
+            if (this.commitMetrics != null) {
+                reportCommit(
+                        appendTableFiles,
+                        Collections.emptyList(),
+                        compactTableFiles,
+                        Collections.emptyList(),
+                        commitDuration,
+                        generatedSnapshot,
+                        attempts);
+            }
         }
     }
 
@@ -430,6 +498,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
+    @Override
+    public FileStoreCommit withMetrics(CommitMetrics metrics) {
+        this.commitMetrics = metrics;
+        return this;
+    }
+
     private void collectChanges(
             List<CommitMessage> commitMessages,
             List<ManifestEntry> appendTableFiles,
@@ -481,7 +555,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 kind, commitMessage.partition(), commitMessage.bucket(), 
numBucket, file);
     }
 
-    private void tryCommit(
+    private int tryCommit(
             List<ManifestEntry> tableFiles,
             List<ManifestEntry> changelogFiles,
             List<IndexManifestEntry> indexFiles,
@@ -490,8 +564,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             Long safeLatestSnapshotId) {
+        int cnt = 0;
         while (true) {
             Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+            cnt++;
             if (tryCommitOnce(
                     tableFiles,
                     changelogFiles,
@@ -505,18 +581,21 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 break;
             }
         }
+        return cnt;
     }
 
-    private void tryOverwrite(
+    private int tryOverwrite(
             Predicate partitionFilter,
             List<ManifestEntry> changes,
             List<IndexManifestEntry> indexFiles,
             long identifier,
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets) {
+        int cnt = 0;
         while (true) {
             Snapshot latestSnapshot = snapshotManager.latestSnapshot();
 
+            cnt++;
             List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
             List<IndexManifestEntry> indexChangesWithOverwrite = new 
ArrayList<>();
             if (latestSnapshot != null) {
@@ -565,6 +644,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 break;
             }
         }
+        return cnt;
     }
 
     @VisibleForTesting
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 916fe17f4..875e2fed7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -261,7 +261,8 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 catalogEnvironment.lockFactory().create(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
                 new ConsumerManager(fileIO, path),
-                coreOptions().snapshotExpireExecutionMode());
+                coreOptions().snapshotExpireExecutionMode(),
+                name());
     }
 
     private List<CommitCallback> createCommitCallbacks() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 7293dce3a..19e95bb0c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.metrics.commit.CommitMetrics;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreExpire;
 import org.apache.paimon.operation.Lock;
@@ -75,6 +76,7 @@ public class TableCommitImpl implements InnerTableCommit {
 
     private ExecutorService expireMainExecutor;
     private AtomicReference<Throwable> expireError;
+    private final CommitMetrics commitMetrics;
 
     public TableCommitImpl(
             FileStoreCommit commit,
@@ -85,8 +87,10 @@ public class TableCommitImpl implements InnerTableCommit {
             Lock lock,
             @Nullable Duration consumerExpireTime,
             ConsumerManager consumerManager,
-            ExpireExecutionMode expireExecutionMode) {
-        commit.withLock(lock);
+            ExpireExecutionMode expireExecutionMode,
+            String tableName) {
+        this.commitMetrics = new CommitMetrics(tableName);
+        commit.withLock(lock).withMetrics(commitMetrics);
         if (expire != null) {
             expire.withLock(lock);
         }
@@ -261,6 +265,7 @@ public class TableCommitImpl implements InnerTableCommit {
         }
         IOUtils.closeQuietly(lock);
         expireMainExecutor.shutdownNow();
+        commitMetrics.close();
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e66a6eccb..ac0c0a64e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -39,6 +39,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** base class for Test {@link ManifestFile}. */
@@ -221,4 +222,26 @@ public abstract class ManifestFileMetaTestBase {
                         makeEntry(false, "B2", partition2),
                         makeEntry(true, "D2", partition2)));
     }
+
+    public static ManifestEntry makeEntry(
+            FileKind fileKind, int partition, int bucket, long rowCount) {
+        return new ManifestEntry(
+                fileKind,
+                row(partition),
+                bucket,
+                0, // not used
+                new DataFileMeta(
+                        "", // not used
+                        0, // not used
+                        rowCount,
+                        null, // not used
+                        null, // not used
+                        StatsTestUtils.newEmptyTableStats(), // not used
+                        StatsTestUtils.newEmptyTableStats(), // not used
+                        0, // not used
+                        0, // not used
+                        0, // not used
+                        0 // not used
+                        ));
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
new file mode 100644
index 000000000..dead22c2e
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.Metrics;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.offset;
+
+/** Tests for {@link CommitMetrics}. */
+public class CommitMetricsTest {
+    private static final String TABLE_NAME = "myTable";
+
+    private CommitMetrics commitMetrics;
+
+    @BeforeEach
+    public void beforeEach() {
+        commitMetrics = getCommitMetrics();
+    }
+
+    @AfterEach
+    public void afterEach() {
+        commitMetrics.close();
+    }
+
+    /** Tests the registration of the commit metrics. */
+    @Test
+    public void testGenericMetricsRegistration() {
+        MetricGroup genericMetricGroup = commitMetrics.getMetricGroup();
+        assertThat(Metrics.getInstance().getMetricGroups().size())
+                .withFailMessage(
+                        String.format(
+                                "Please close the created metric groups %s in 
case of metrics resource leak.",
+                                Metrics.groupsInfo()))
+                .isEqualTo(1);
+        
assertThat(genericMetricGroup.getGroupName()).isEqualTo(CommitMetrics.GROUP_NAME);
+        Map<String, Metric> registeredMetrics = 
genericMetricGroup.getMetrics();
+        assertThat(registeredMetrics.keySet())
+                .containsExactlyInAnyOrder(
+                        CommitMetrics.LAST_COMMIT_DURATION,
+                        CommitMetrics.LAST_COMMIT_ATTEMPTS,
+                        CommitMetrics.LAST_GENERATED_SNAPSHOTS,
+                        CommitMetrics.LAST_PARTITIONS_WRITTEN,
+                        CommitMetrics.LAST_BUCKETS_WRITTEN,
+                        CommitMetrics.COMMIT_DURATION,
+                        CommitMetrics.LAST_TABLE_FILES_ADDED,
+                        CommitMetrics.LAST_TABLE_FILES_DELETED,
+                        CommitMetrics.LAST_TABLE_FILES_APPENDED,
+                        CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED,
+                        CommitMetrics.LAST_CHANGELOG_FILES_APPENDED,
+                        CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
+                        CommitMetrics.LAST_DELTA_RECORDS_APPENDED,
+                        CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED,
+                        CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED,
+                        CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);
+
+        reportOnce(commitMetrics);
+        
assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1);
+    }
+
+    /** Tests that the metrics are updated properly. */
+    @Test
+    public void testMetricsAreUpdated() {
+        Map<String, Metric> registeredGenericMetrics = 
commitMetrics.getMetricGroup().getMetrics();
+
+        // Check initial values
+        Gauge<Long> lastCommitDuration =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_COMMIT_DURATION);
+        Histogram commitDuration =
+                (Histogram) 
registeredGenericMetrics.get(CommitMetrics.COMMIT_DURATION);
+        Gauge<Long> lastCommitAttempts =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_COMMIT_ATTEMPTS);
+        Gauge<Long> lastGeneratedSnapshots =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_GENERATED_SNAPSHOTS);
+        Gauge<Long> lastPartitionsWritten =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_PARTITIONS_WRITTEN);
+        Gauge<Long> lastBucketsWritten =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_BUCKETS_WRITTEN);
+        Gauge<Long> lastTableFilesAdded =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_ADDED);
+        Gauge<Long> lastTableFilesDeleted =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_DELETED);
+
+        Gauge<Long> lastTableFilesAppended =
+                (Gauge<Long>) 
registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_APPENDED);
+
+        Gauge<Long> lastTableFilesCompacted =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED);
+
+        Gauge<Long> lastChangelogFilesAppended =
+                (Gauge<Long>)
+                        
registeredGenericMetrics.get(CommitMetrics.LAST_CHANGELOG_FILES_APPENDED);
+
+        Gauge<Long> lastChangelogFilesCompacted =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED);
+
+        Gauge<Long> lastDeltaRecordsAppended =
+                (Gauge<Long>)
+                        
registeredGenericMetrics.get(CommitMetrics.LAST_DELTA_RECORDS_APPENDED);
+
+        Gauge<Long> lastChangelogRecordsAppended =
+                (Gauge<Long>)
+                        
registeredGenericMetrics.get(CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED);
+
+        Gauge<Long> lastDeltaRecordsCompacted =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED);
+
+        Gauge<Long> lastChangelogRecordsCompacted =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);
+
+        assertThat(lastCommitDuration.getValue()).isEqualTo(0);
+        assertThat(commitDuration.getCount()).isEqualTo(0);
+        assertThat(commitDuration.getStatistics().size()).isEqualTo(0);
+        assertThat(lastCommitAttempts.getValue()).isEqualTo(0);
+        assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(0);
+        assertThat(lastPartitionsWritten.getValue()).isEqualTo(0);
+        assertThat(lastBucketsWritten.getValue()).isEqualTo(0);
+        assertThat(lastTableFilesAdded.getValue()).isEqualTo(0);
+        assertThat(lastTableFilesDeleted.getValue()).isEqualTo(0);
+        assertThat(lastTableFilesAppended.getValue()).isEqualTo(0);
+        assertThat(lastTableFilesCompacted.getValue()).isEqualTo(0);
+        assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(0);
+        assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(0);
+        assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(0);
+        assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(0);
+        assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(0);
+        assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(0);
+
+        // report once
+        reportOnce(commitMetrics);
+
+        // generic metrics value updated
+        assertThat(lastCommitDuration.getValue()).isEqualTo(200);
+        assertThat(commitDuration.getCount()).isEqualTo(1);
+        assertThat(commitDuration.getStatistics().size()).isEqualTo(1);
+        
assertThat(commitDuration.getStatistics().getValues()[0]).isEqualTo(200L);
+        assertThat(commitDuration.getStatistics().getMin()).isEqualTo(200);
+        
assertThat(commitDuration.getStatistics().getQuantile(0.5)).isCloseTo(200.0, 
offset(0.001));
+        assertThat(commitDuration.getStatistics().getMean()).isEqualTo(200);
+        assertThat(commitDuration.getStatistics().getMax()).isEqualTo(200);
+        assertThat(commitDuration.getStatistics().getStdDev()).isEqualTo(0);
+        assertThat(lastCommitAttempts.getValue()).isEqualTo(1);
+        assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(2);
+        assertThat(lastPartitionsWritten.getValue()).isEqualTo(3);
+        assertThat(lastBucketsWritten.getValue()).isEqualTo(3);
+        assertThat(lastTableFilesAdded.getValue()).isEqualTo(4);
+        assertThat(lastTableFilesDeleted.getValue()).isEqualTo(1);
+        assertThat(lastTableFilesAppended.getValue()).isEqualTo(2);
+        assertThat(lastTableFilesCompacted.getValue()).isEqualTo(3);
+        assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(2);
+        assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+        assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(503);
+        assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(503);
+        assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(613);
+        assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(512);
+
+        // report again
+        reportAgain(commitMetrics);
+
+        // generic metrics value updated
+        assertThat(lastCommitDuration.getValue()).isEqualTo(500);
+        assertThat(commitDuration.getCount()).isEqualTo(2);
+        assertThat(commitDuration.getStatistics().size()).isEqualTo(2);
+        
assertThat(commitDuration.getStatistics().getValues()[1]).isEqualTo(500L);
+        assertThat(commitDuration.getStatistics().getMin()).isEqualTo(200);
+        
assertThat(commitDuration.getStatistics().getQuantile(0.5)).isCloseTo(350.0, 
offset(0.001));
+        assertThat(commitDuration.getStatistics().getMean()).isEqualTo(350);
+        assertThat(commitDuration.getStatistics().getMax()).isEqualTo(500);
+        
assertThat(commitDuration.getStatistics().getStdDev()).isCloseTo(212.132, 
offset(0.001));
+        assertThat(lastCommitAttempts.getValue()).isEqualTo(2);
+        assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(1);
+        assertThat(lastPartitionsWritten.getValue()).isEqualTo(2);
+        assertThat(lastBucketsWritten.getValue()).isEqualTo(3);
+        assertThat(lastTableFilesAdded.getValue()).isEqualTo(4);
+        assertThat(lastTableFilesDeleted.getValue()).isEqualTo(1);
+        assertThat(lastTableFilesAppended.getValue()).isEqualTo(2);
+        assertThat(lastTableFilesCompacted.getValue()).isEqualTo(3);
+        assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(2);
+        assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+        assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(805);
+        assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(213);
+        assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(506);
+        assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(601);
+    }
+
+    private void reportOnce(CommitMetrics commitMetrics) {
+        List<ManifestEntry> appendTableFiles = new ArrayList<>();
+        List<ManifestEntry> appendChangelogFiles = new ArrayList<>();
+        List<ManifestEntry> compactTableFiles = new ArrayList<>();
+        List<ManifestEntry> compactChangelogFiles = new ArrayList<>();
+
+        appendTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 201));
+        appendTableFiles.add(makeEntry(FileKind.ADD, 2, 3, 302));
+        appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 202));
+        appendChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 301));
+        compactTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 203));
+        compactTableFiles.add(makeEntry(FileKind.ADD, 2, 3, 304));
+        compactTableFiles.add(makeEntry(FileKind.DELETE, 3, 5, 106));
+        compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 205));
+        compactChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 307));
+
+        CommitStats commitStats =
+                new CommitStats(
+                        appendTableFiles,
+                        appendChangelogFiles,
+                        compactTableFiles,
+                        compactChangelogFiles,
+                        200,
+                        2,
+                        1);
+
+        commitMetrics.reportCommit(commitStats);
+    }
+
+    private void reportAgain(CommitMetrics commitMetrics) {
+        List<ManifestEntry> appendTableFiles = new ArrayList<>();
+        List<ManifestEntry> appendChangelogFiles = new ArrayList<>();
+        List<ManifestEntry> compactTableFiles = new ArrayList<>();
+        List<ManifestEntry> compactChangelogFiles = new ArrayList<>();
+
+        appendTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 400));
+        appendTableFiles.add(makeEntry(FileKind.ADD, 3, 4, 405));
+        appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 102));
+        appendChangelogFiles.add(makeEntry(FileKind.ADD, 3, 4, 111));
+        compactTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 200));
+        compactTableFiles.add(makeEntry(FileKind.ADD, 3, 4, 201));
+        compactTableFiles.add(makeEntry(FileKind.DELETE, 3, 5, 105));
+        compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 300));
+        compactChangelogFiles.add(makeEntry(FileKind.ADD, 3, 4, 301));
+
+        CommitStats commitStats =
+                new CommitStats(
+                        appendTableFiles,
+                        appendChangelogFiles,
+                        compactTableFiles,
+                        compactChangelogFiles,
+                        500,
+                        1,
+                        2);
+
+        commitMetrics.reportCommit(commitStats);
+    }
+
+    private CommitMetrics getCommitMetrics() {
+        return new CommitMetrics(TABLE_NAME);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
new file mode 100644
index 000000000..640dba08f
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CommitStats}. */
+public class CommitStatsTest {
+    private static List<ManifestEntry> files = new ArrayList<>();
+    private static List<ManifestEntry> appendDataFiles = new ArrayList<>();
+    private static List<ManifestEntry> appendChangelogFiles = new 
ArrayList<>();
+    private static List<ManifestEntry> compactDataFiles = new ArrayList<>();
+    private static List<ManifestEntry> compactChangelogFiles = new 
ArrayList<>();
+
+    @BeforeAll
+    public static void beforeAll() {
+        appendDataFiles.add(makeEntry(FileKind.ADD, 1, 1, 201));
+        appendDataFiles.add(makeEntry(FileKind.ADD, 2, 3, 302));
+        appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 202));
+        appendChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 301));
+        compactDataFiles.add(makeEntry(FileKind.ADD, 1, 1, 203));
+        compactDataFiles.add(makeEntry(FileKind.ADD, 2, 3, 304));
+        compactDataFiles.add(makeEntry(FileKind.DELETE, 3, 5, 106));
+        compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 205));
+        compactChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 307));
+        files.addAll(appendDataFiles);
+        files.addAll(appendChangelogFiles);
+        files.addAll(compactDataFiles);
+        files.addAll(compactChangelogFiles);
+    }
+
+    @Test
+    public void testCalcChangedPartitionsAndBuckets() {
+        assertThat(CommitStats.numChangedBuckets(files)).isEqualTo(3);
+        assertThat(CommitStats.numChangedPartitions(files)).isEqualTo(3);
+        
assertThat(CommitStats.changedPartBuckets(files).get(row(1))).containsExactly(1);
+        
assertThat(CommitStats.changedPartBuckets(files).get(row(2))).containsExactly(3);
+        
assertThat(CommitStats.changedPartBuckets(files).get(row(3))).containsExactly(5);
+        assertThat(CommitStats.changedPartitions(files))
+                .containsExactlyInAnyOrder(row(1), row(2), row(3));
+    }
+
+    @Test
+    public void testFailedAppendSnapshot() {
+        CommitStats commitStats =
+                new CommitStats(
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        0,
+                        0,
+                        1);
+        assertThat(commitStats.getTableFilesAdded()).isEqualTo(0);
+        assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
+        assertThat(commitStats.getTableFilesAppended()).isEqualTo(0);
+        assertThat(commitStats.getTableFilesCompacted()).isEqualTo(0);
+        assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(0);
+        assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(0);
+        assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(0);
+        assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(0);
+        assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(0);
+        assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(0);
+        assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(0);
+        assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(0);
+        assertThat(commitStats.getNumBucketsWritten()).isEqualTo(0);
+        assertThat(commitStats.getDuration()).isEqualTo(0);
+        assertThat(commitStats.getAttempts()).isEqualTo(1);
+    }
+
+    @Test
+    public void testFailedCompactSnapshot() {
+        CommitStats commitStats =
+                new CommitStats(
+                        appendDataFiles,
+                        appendChangelogFiles,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        3000,
+                        1,
+                        2);
+        assertThat(commitStats.getTableFilesAdded()).isEqualTo(2);
+        assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
+        assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
+        assertThat(commitStats.getTableFilesCompacted()).isEqualTo(0);
+        assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(2);
+        assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(0);
+        assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(1);
+        assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(503);
+        assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(503);
+        assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(0);
+        assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(0);
+        assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(2);
+        assertThat(commitStats.getNumBucketsWritten()).isEqualTo(2);
+        assertThat(commitStats.getDuration()).isEqualTo(3000);
+        assertThat(commitStats.getAttempts()).isEqualTo(2);
+    }
+
+    @Test
+    public void testSucceedAllSnapshot() {
+        CommitStats commitStats =
+                new CommitStats(
+                        appendDataFiles,
+                        appendChangelogFiles,
+                        compactDataFiles,
+                        compactChangelogFiles,
+                        3000,
+                        2,
+                        2);
+        assertThat(commitStats.getTableFilesAdded()).isEqualTo(4);
+        assertThat(commitStats.getTableFilesDeleted()).isEqualTo(1);
+        assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
+        assertThat(commitStats.getTableFilesCompacted()).isEqualTo(3);
+        assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(2);
+        assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(2);
+        assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(2);
+        assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(503);
+        assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(503);
+        assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(613);
+        assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(512);
+        assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(3);
+        assertThat(commitStats.getNumBucketsWritten()).isEqualTo(3);
+        assertThat(commitStats.getDuration()).isEqualTo(3000);
+        assertThat(commitStats.getAttempts()).isEqualTo(2);
+    }
+}

Reply via email to