This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dbec0c3e6 [core] Support commit metrics (#1638)
dbec0c3e6 is described below
commit dbec0c3e606a1517dc868ee143c2b5e9fb796dd4
Author: GuojunLi <[email protected]>
AuthorDate: Fri Oct 20 11:15:21 2023 +0800
[core] Support commit metrics (#1638)
This closes #1638.
---
.../apache/paimon/metrics/AbstractMetricGroup.java | 10 +
.../java/org/apache/paimon/metrics/Metrics.java | 7 +
.../paimon/metrics/commit/CommitMetrics.java | 137 ++++++++++
.../apache/paimon/metrics/commit/CommitStats.java | 206 +++++++++++++++
.../paimon/metrics/groups/GenericMetricGroup.java | 4 +-
.../apache/paimon/operation/FileStoreCommit.java | 4 +
.../paimon/operation/FileStoreCommitImpl.java | 292 +++++++++++++--------
.../paimon/table/AbstractFileStoreTable.java | 3 +-
.../apache/paimon/table/sink/TableCommitImpl.java | 9 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 23 ++
.../paimon/metrics/commit/CommitMetricsTest.java | 286 ++++++++++++++++++++
.../paimon/metrics/commit/CommitStatsTest.java | 154 +++++++++++
12 files changed, 1024 insertions(+), 111 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
index 78f293a50..41980ff3e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
@@ -190,4 +190,14 @@ public abstract class AbstractMetricGroup implements
MetricGroup {
public final boolean isClosed() {
return closed;
}
+
+ @Override
+ public String toString() {
+ return "MetricGroup{"
+ + "groupName="
+ + getGroupName()
+ + ", metrics="
+ + String.join(",", metrics.keySet())
+ + '}';
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
index 6ef28749b..379f3d6c6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
@@ -19,6 +19,7 @@
package org.apache.paimon.metrics;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
/** Core of Paimon metrics system. */
public class Metrics {
@@ -53,4 +54,10 @@ public class Metrics {
public ConcurrentLinkedQueue<MetricGroup> getMetricGroups() {
return metricGroups;
}
+
+ public static String groupsInfo() {
+ return getInstance().getMetricGroups().stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(", ", "[", "]"));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
new file mode 100644
index 000000000..a216a94e2
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.metrics.AbstractMetricGroup;
+import org.apache.paimon.metrics.DescriptiveStatisticsHistogram;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.groups.GenericMetricGroup;
+
+/** Metrics to measure a commit. */
+public class CommitMetrics {
+ private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
+ protected static final String GROUP_NAME = "commit";
+
+ private final AbstractMetricGroup genericMetricGroup;
+
+ public CommitMetrics(String tableName) {
+ this.genericMetricGroup =
+ GenericMetricGroup.createGenericMetricGroup(tableName,
GROUP_NAME);
+ registerGenericCommitMetrics();
+ }
+
+ @VisibleForTesting
+ public AbstractMetricGroup getMetricGroup() {
+ return genericMetricGroup;
+ }
+
+ private final Histogram durationHistogram =
+ new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE);
+
+ private CommitStats latestCommit;
+
+ @VisibleForTesting static final String LAST_COMMIT_DURATION =
"lastCommitDuration";
+ @VisibleForTesting static final String COMMIT_DURATION = "commitDuration";
+ @VisibleForTesting static final String LAST_COMMIT_ATTEMPTS =
"lastCommitAttempts";
+ @VisibleForTesting static final String LAST_TABLE_FILES_ADDED =
"lastTableFilesAdded";
+ @VisibleForTesting static final String LAST_TABLE_FILES_DELETED =
"lastTableFilesDeleted";
+ @VisibleForTesting static final String LAST_TABLE_FILES_APPENDED =
"lastTableFilesAppended";
+
+ @VisibleForTesting
+ static final String LAST_TABLE_FILES_COMMIT_COMPACTED =
"lastTableFilesCommitCompacted";
+
+ @VisibleForTesting
+ static final String LAST_CHANGELOG_FILES_APPENDED =
"lastChangelogFilesAppended";
+
+ @VisibleForTesting
+ static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED =
"lastChangelogFileCommitCompacted";
+
+ @VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS =
"lastGeneratedSnapshots";
+ @VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED =
"lastDeltaRecordsAppended";
+
+ @VisibleForTesting
+ static final String LAST_CHANGELOG_RECORDS_APPENDED =
"lastChangelogRecordsAppended";
+
+ @VisibleForTesting
+ static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED =
"lastDeltaRecordsCommitCompacted";
+
+ @VisibleForTesting
+ static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED =
+ "lastChangelogRecordsCommitCompacted";
+
+ @VisibleForTesting static final String LAST_PARTITIONS_WRITTEN =
"lastPartitionsWritten";
+ @VisibleForTesting static final String LAST_BUCKETS_WRITTEN =
"lastBucketsWritten";
+
+ private void registerGenericCommitMetrics() {
+ genericMetricGroup.gauge(
+ LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L :
latestCommit.getDuration());
+ genericMetricGroup.gauge(
+ LAST_COMMIT_ATTEMPTS, () -> latestCommit == null ? 0L :
latestCommit.getAttempts());
+ genericMetricGroup.gauge(
+ LAST_GENERATED_SNAPSHOTS,
+ () -> latestCommit == null ? 0L :
latestCommit.getGeneratedSnapshots());
+ genericMetricGroup.gauge(
+ LAST_PARTITIONS_WRITTEN,
+ () -> latestCommit == null ? 0L :
latestCommit.getNumPartitionsWritten());
+ genericMetricGroup.gauge(
+ LAST_BUCKETS_WRITTEN,
+ () -> latestCommit == null ? 0L :
latestCommit.getNumBucketsWritten());
+ genericMetricGroup.histogram(COMMIT_DURATION, durationHistogram);
+ genericMetricGroup.gauge(
+ LAST_TABLE_FILES_ADDED,
+ () -> latestCommit == null ? 0L :
latestCommit.getTableFilesAdded());
+ genericMetricGroup.gauge(
+ LAST_TABLE_FILES_DELETED,
+ () -> latestCommit == null ? 0L :
latestCommit.getTableFilesDeleted());
+ genericMetricGroup.gauge(
+ LAST_TABLE_FILES_APPENDED,
+ () -> latestCommit == null ? 0L :
latestCommit.getTableFilesAppended());
+ genericMetricGroup.gauge(
+ LAST_TABLE_FILES_COMMIT_COMPACTED,
+ () -> latestCommit == null ? 0L :
latestCommit.getTableFilesCompacted());
+ genericMetricGroup.gauge(
+ LAST_CHANGELOG_FILES_APPENDED,
+ () -> latestCommit == null ? 0L :
latestCommit.getChangelogFilesAppended());
+ genericMetricGroup.gauge(
+ LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
+ () -> latestCommit == null ? 0L :
latestCommit.getChangelogFilesCompacted());
+ genericMetricGroup.gauge(
+ LAST_DELTA_RECORDS_APPENDED,
+ () -> latestCommit == null ? 0L :
latestCommit.getDeltaRecordsAppended());
+ genericMetricGroup.gauge(
+ LAST_CHANGELOG_RECORDS_APPENDED,
+ () -> latestCommit == null ? 0L :
latestCommit.getChangelogRecordsAppended());
+ genericMetricGroup.gauge(
+ LAST_DELTA_RECORDS_COMMIT_COMPACTED,
+ () -> latestCommit == null ? 0L :
latestCommit.getDeltaRecordsCompacted());
+ genericMetricGroup.gauge(
+ LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED,
+ () -> latestCommit == null ? 0L :
latestCommit.getChangelogRecordsCompacted());
+ }
+
+ public void reportCommit(CommitStats commitStats) {
+ latestCommit = commitStats;
+ durationHistogram.update(commitStats.getDuration());
+ }
+
+ public void close() {
+ this.genericMetricGroup.close();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
new file mode 100644
index 000000000..0a2772f63
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Statistics for a commit. */
+public class CommitStats {
+ private final long duration;
+ private final int attempts;
+ private final long tableFilesAdded;
+ private final long tableFilesAppended;
+ private final long tableFilesDeleted;
+ private final long changelogFilesAppended;
+ private final long changelogFilesCompacted;
+ private final long changelogRecordsCompacted;
+
+ private final long deltaRecordsCompacted;
+ private final long changelogRecordsAppended;
+ private final long deltaRecordsAppended;
+ private final long tableFilesCompacted;
+ private final long generatedSnapshots;
+ private final long numPartitionsWritten;
+ private final long numBucketsWritten;
+
+ public CommitStats(
+ List<ManifestEntry> appendTableFiles,
+ List<ManifestEntry> appendChangelogFiles,
+ List<ManifestEntry> compactTableFiles,
+ List<ManifestEntry> compactChangelogFiles,
+ long commitDuration,
+ int generatedSnapshots,
+ int attempts) {
+ List<ManifestEntry> addedTableFiles = new
ArrayList<>(appendTableFiles);
+ addedTableFiles.addAll(
+ compactTableFiles.stream()
+ .filter(f -> FileKind.ADD.equals(f.kind()))
+ .collect(Collectors.toList()));
+ List<ManifestEntry> deletedTableFiles =
+ compactTableFiles.stream()
+ .filter(f -> FileKind.DELETE.equals(f.kind()))
+ .collect(Collectors.toList());
+
+ this.tableFilesAdded = addedTableFiles.size();
+ this.tableFilesAppended = appendTableFiles.size();
+ this.tableFilesDeleted = deletedTableFiles.size();
+ this.tableFilesCompacted = compactTableFiles.size();
+ this.changelogFilesAppended = appendChangelogFiles.size();
+ this.changelogFilesCompacted = compactChangelogFiles.size();
+ this.numPartitionsWritten = numChangedPartitions(appendTableFiles,
compactTableFiles);
+ this.numBucketsWritten = numChangedBuckets(appendTableFiles,
compactTableFiles);
+ this.changelogRecordsCompacted = getRowCounts(compactChangelogFiles);
+ this.deltaRecordsCompacted = getRowCounts(compactTableFiles);
+ this.changelogRecordsAppended = getRowCounts(appendChangelogFiles);
+ this.deltaRecordsAppended = getRowCounts(appendTableFiles);
+ this.duration = commitDuration;
+ this.generatedSnapshots = generatedSnapshots;
+ this.attempts = attempts;
+ }
+
+ @VisibleForTesting
+ protected static long numChangedPartitions(List<ManifestEntry>... changes)
{
+ return Arrays.stream(changes)
+ .flatMap(Collection::stream)
+ .map(ManifestEntry::partition)
+ .distinct()
+ .count();
+ }
+
+ @VisibleForTesting
+ protected static long numChangedBuckets(List<ManifestEntry>... changes) {
+ return
changedPartBuckets(changes).values().stream().mapToLong(Set::size).sum();
+ }
+
+ @VisibleForTesting
+ protected static List<BinaryRow> changedPartitions(List<ManifestEntry>...
changes) {
+ return Arrays.stream(changes)
+ .flatMap(Collection::stream)
+ .map(ManifestEntry::partition)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ protected static Map<BinaryRow, Set<Integer>> changedPartBuckets(
+ List<ManifestEntry>... changes) {
+ Map<BinaryRow, Set<Integer>> changedPartBuckets = new
LinkedHashMap<>();
+ Arrays.stream(changes)
+ .flatMap(Collection::stream)
+ .forEach(
+ entry ->
+ changedPartBuckets
+ .computeIfAbsent(
+ entry.partition(), k -> new
LinkedHashSet<>())
+ .add(entry.bucket()));
+ return changedPartBuckets;
+ }
+
+ private long getRowCounts(List<ManifestEntry> files) {
+ return files.stream().mapToLong(file -> file.file().rowCount()).sum();
+ }
+
+ @VisibleForTesting
+ protected long getTableFilesAdded() {
+ return tableFilesAdded;
+ }
+
+ @VisibleForTesting
+ protected long getTableFilesDeleted() {
+ return tableFilesDeleted;
+ }
+
+ @VisibleForTesting
+ protected long getTableFilesAppended() {
+ return tableFilesAppended;
+ }
+
+ @VisibleForTesting
+ protected long getTableFilesCompacted() {
+ return tableFilesCompacted;
+ }
+
+ @VisibleForTesting
+ protected long getChangelogFilesAppended() {
+ return changelogFilesAppended;
+ }
+
+ @VisibleForTesting
+ protected long getChangelogFilesCompacted() {
+ return changelogFilesCompacted;
+ }
+
+ @VisibleForTesting
+ protected long getGeneratedSnapshots() {
+ return generatedSnapshots;
+ }
+
+ @VisibleForTesting
+ protected long getDeltaRecordsAppended() {
+ return deltaRecordsAppended;
+ }
+
+ @VisibleForTesting
+ protected long getChangelogRecordsAppended() {
+ return changelogRecordsAppended;
+ }
+
+ @VisibleForTesting
+ protected long getDeltaRecordsCompacted() {
+ return deltaRecordsCompacted;
+ }
+
+ @VisibleForTesting
+ protected long getChangelogRecordsCompacted() {
+ return changelogRecordsCompacted;
+ }
+
+ @VisibleForTesting
+ protected long getNumPartitionsWritten() {
+ return numPartitionsWritten;
+ }
+
+ @VisibleForTesting
+ protected long getNumBucketsWritten() {
+ return numBucketsWritten;
+ }
+
+ @VisibleForTesting
+ protected long getDuration() {
+ return duration;
+ }
+
+ @VisibleForTesting
+ protected int getAttempts() {
+ return attempts;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
index df927aa35..f1d5c3157 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
@@ -34,9 +34,9 @@ public class GenericMetricGroup extends AbstractMetricGroup {
}
public static GenericMetricGroup createGenericMetricGroup(
- final String table, final String groupName) {
+ final String tableName, final String groupName) {
Map<String, String> tags = new HashMap<>();
- tags.put("table", table);
+ tags.put("table", tableName);
return new GenericMetricGroup(tags, groupName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index a585d6812..31d22c37e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import java.util.List;
@@ -78,4 +79,7 @@ public interface FileStoreCommit {
/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);
+
+ /** With metrics to measure commits. */
+ FileStoreCommit withMetrics(CommitMetrics metrics);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a40964975..77c22687c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -34,6 +34,8 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.metrics.commit.CommitMetrics;
+import org.apache.paimon.metrics.commit.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -112,6 +114,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
+ private CommitMetrics commitMetrics;
+
public FileStoreCommitImpl(
FileIO fileIO,
SchemaManager schemaManager,
@@ -149,6 +153,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.lock = null;
this.ignoreEmptyCommit = true;
+ this.commitMetrics = null;
}
@Override
@@ -192,6 +197,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
LOG.debug("Ready to commit\n" + committable.toString());
}
+ long started = System.nanoTime();
+ int generatedSnapshot = 0;
+ int attempts = 0;
Snapshot latestSnapshot = null;
Long safeLatestSnapshotId = null;
List<ManifestEntry> baseEntries = new ArrayList<>();
@@ -208,65 +216,104 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
compactTableFiles,
compactChangelog,
appendIndexFiles);
+ try {
+ if (!ignoreEmptyCommit
+ || !appendTableFiles.isEmpty()
+ || !appendChangelog.isEmpty()
+ || !appendIndexFiles.isEmpty()) {
+ // Optimization for common path.
+ // Step 1:
+ // Read manifest entries from changed partitions here and
check for conflicts.
+ // If there are no other jobs committing at the same time,
+ // we can skip conflict checking in tryCommit method.
+ // This optimization is mainly used to decrease the number of
times we read from
+ // files.
+ latestSnapshot = snapshotManager.latestSnapshot();
+ if (latestSnapshot != null) {
+ // it is possible that some partitions only have compact
changes,
+ // so we need to contain all changes
+ baseEntries.addAll(
+ readAllEntriesFromChangedPartitions(
+ latestSnapshot, appendTableFiles,
compactTableFiles));
+ noConflictsOrFail(latestSnapshot.commitUser(),
baseEntries, appendTableFiles);
+ safeLatestSnapshotId = latestSnapshot.id();
+ }
- if (!ignoreEmptyCommit
- || !appendTableFiles.isEmpty()
- || !appendChangelog.isEmpty()
- || !appendIndexFiles.isEmpty()) {
- // Optimization for common path.
- // Step 1:
- // Read manifest entries from changed partitions here and check
for conflicts.
- // If there are no other jobs committing at the same time,
- // we can skip conflict checking in tryCommit method.
- // This optimization is mainly used to decrease the number of
times we read from files.
- latestSnapshot = snapshotManager.latestSnapshot();
- if (latestSnapshot != null) {
- // it is possible that some partitions only have compact
changes,
- // so we need to contain all changes
- baseEntries.addAll(
- readAllEntriesFromChangedPartitions(
- latestSnapshot, appendTableFiles,
compactTableFiles));
- noConflictsOrFail(latestSnapshot.commitUser(), baseEntries,
appendTableFiles);
- safeLatestSnapshotId = latestSnapshot.id();
+ attempts +=
+ tryCommit(
+ appendTableFiles,
+ appendChangelog,
+ appendIndexFiles,
+ committable.identifier(),
+ committable.watermark(),
+ committable.logOffsets(),
+ Snapshot.CommitKind.APPEND,
+ safeLatestSnapshotId);
+ generatedSnapshot += 1;
}
- tryCommit(
- appendTableFiles,
- appendChangelog,
- appendIndexFiles,
- committable.identifier(),
- committable.watermark(),
- committable.logOffsets(),
- Snapshot.CommitKind.APPEND,
- safeLatestSnapshotId);
- }
+ if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
+ // Optimization for common path.
+ // Step 2:
+ // Add appendChanges to the manifest entries read above and
check for conflicts.
+ // If there are no other jobs committing at the same time,
+ // we can skip conflict checking in tryCommit method.
+ // This optimization is mainly used to decrease the number of
times we read from
+ // files.
+ if (safeLatestSnapshotId != null) {
+ baseEntries.addAll(appendTableFiles);
+ noConflictsOrFail(latestSnapshot.commitUser(),
baseEntries, compactTableFiles);
+ // assume this compact commit follows just after the
append commit created above
+ safeLatestSnapshotId += 1;
+ }
- if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
- // Optimization for common path.
- // Step 2:
- // Add appendChanges to the manifest entries read above and check
for conflicts.
- // If there are no other jobs committing at the same time,
- // we can skip conflict checking in tryCommit method.
- // This optimization is mainly used to decrease the number of
times we read from files.
- if (safeLatestSnapshotId != null) {
- baseEntries.addAll(appendTableFiles);
- noConflictsOrFail(latestSnapshot.commitUser(), baseEntries,
compactTableFiles);
- // assume this compact commit follows just after the append
commit created above
- safeLatestSnapshotId += 1;
+ attempts +=
+ tryCommit(
+ compactTableFiles,
+ compactChangelog,
+ Collections.emptyList(),
+ committable.identifier(),
+ committable.watermark(),
+ committable.logOffsets(),
+ Snapshot.CommitKind.COMPACT,
+ safeLatestSnapshotId);
+ generatedSnapshot += 1;
+ }
+ } finally {
+ long commitDuration = (System.nanoTime() - started) / 1_000_000;
+ if (this.commitMetrics != null) {
+ reportCommit(
+ appendTableFiles,
+ appendChangelog,
+ compactTableFiles,
+ compactChangelog,
+ commitDuration,
+ generatedSnapshot,
+ attempts);
}
-
- tryCommit(
- compactTableFiles,
- compactChangelog,
- Collections.emptyList(),
- committable.identifier(),
- committable.watermark(),
- committable.logOffsets(),
- Snapshot.CommitKind.COMPACT,
- safeLatestSnapshotId);
}
}
+ private void reportCommit(
+ List<ManifestEntry> appendTableFiles,
+ List<ManifestEntry> appendChangelogFiles,
+ List<ManifestEntry> compactTableFiles,
+ List<ManifestEntry> compactChangelogFiles,
+ long commitDuration,
+ int generatedSnapshots,
+ int attempts) {
+ CommitStats commitStats =
+ new CommitStats(
+ appendTableFiles,
+ appendChangelogFiles,
+ compactTableFiles,
+ compactChangelogFiles,
+ commitDuration,
+ generatedSnapshots,
+ attempts);
+ commitMetrics.reportCommit(commitStats);
+ }
+
@Override
public void overwrite(
Map<String, String> partition,
@@ -280,6 +327,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
properties);
}
+ long started = System.nanoTime();
+ int generatedSnapshot = 0;
+ int attempts = 0;
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
@@ -309,65 +359,83 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
LOG.warn(warnMessage.toString());
}
- boolean skipOverwrite = false;
- // partition filter is built from static or dynamic partition
according to properties
- Predicate partitionFilter = null;
- if (dynamicPartitionOverwrite) {
- if (appendTableFiles.isEmpty()) {
- // in dynamic mode, if there is no changes to commit, no data
will be deleted
- skipOverwrite = true;
+ try {
+ boolean skipOverwrite = false;
+ // partition filter is built from static or dynamic partition
according to properties
+ Predicate partitionFilter = null;
+ if (dynamicPartitionOverwrite) {
+ if (appendTableFiles.isEmpty()) {
+ // in dynamic mode, if there is no changes to commit, no
data will be deleted
+ skipOverwrite = true;
+ } else {
+ partitionFilter =
+ appendTableFiles.stream()
+ .map(ManifestEntry::partition)
+ .distinct()
+ // partition filter is built from new
data's partitions
+ .map(p ->
PredicateBuilder.equalPartition(p, partitionType))
+ .reduce(PredicateBuilder::or)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Failed to get
dynamic partition filter. This is unexpected."));
+ }
} else {
- partitionFilter =
- appendTableFiles.stream()
- .map(ManifestEntry::partition)
- .distinct()
- // partition filter is built from new data's
partitions
- .map(p -> PredicateBuilder.equalPartition(p,
partitionType))
- .reduce(PredicateBuilder::or)
- .orElseThrow(
- () ->
- new RuntimeException(
- "Failed to get dynamic
partition filter. This is unexpected."));
- }
- } else {
- partitionFilter = PredicateBuilder.partition(partition,
partitionType);
- // sanity check, all changes must be done within the given
partition
- if (partitionFilter != null) {
- for (ManifestEntry entry : appendTableFiles) {
- if (!partitionFilter.test(
-
partitionObjectConverter.convert(entry.partition()))) {
- throw new IllegalArgumentException(
- "Trying to overwrite partition "
- + partition
- + ", but the changes in "
- +
pathFactory.getPartitionString(entry.partition())
- + " does not belong to this
partition");
+ partitionFilter = PredicateBuilder.partition(partition,
partitionType);
+ // sanity check, all changes must be done within the given
partition
+ if (partitionFilter != null) {
+ for (ManifestEntry entry : appendTableFiles) {
+ if (!partitionFilter.test(
+
partitionObjectConverter.convert(entry.partition()))) {
+ throw new IllegalArgumentException(
+ "Trying to overwrite partition "
+ + partition
+ + ", but the changes in "
+ +
pathFactory.getPartitionString(entry.partition())
+ + " does not belong to this
partition");
+ }
}
}
}
- }
- // overwrite new files
- if (!skipOverwrite) {
- tryOverwrite(
- partitionFilter,
- appendTableFiles,
- appendIndexFiles,
- committable.identifier(),
- committable.watermark(),
- committable.logOffsets());
- }
+ // overwrite new files
+ if (!skipOverwrite) {
+ attempts +=
+ tryOverwrite(
+ partitionFilter,
+ appendTableFiles,
+ appendIndexFiles,
+ committable.identifier(),
+ committable.watermark(),
+ committable.logOffsets());
+ generatedSnapshot += 1;
+ }
- if (!compactTableFiles.isEmpty()) {
- tryCommit(
- compactTableFiles,
- Collections.emptyList(),
- Collections.emptyList(),
- committable.identifier(),
- committable.watermark(),
- committable.logOffsets(),
- Snapshot.CommitKind.COMPACT,
- null);
+ if (!compactTableFiles.isEmpty()) {
+ attempts +=
+ tryCommit(
+ compactTableFiles,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ committable.identifier(),
+ committable.watermark(),
+ committable.logOffsets(),
+ Snapshot.CommitKind.COMPACT,
+ null);
+ generatedSnapshot += 1;
+ }
+ } finally {
+ long commitDuration = (System.nanoTime() - started) / 1_000_000;
+ if (this.commitMetrics != null) {
+ reportCommit(
+ appendTableFiles,
+ Collections.emptyList(),
+ compactTableFiles,
+ Collections.emptyList(),
+ commitDuration,
+ generatedSnapshot,
+ attempts);
+ }
}
}
@@ -430,6 +498,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
+ @Override
+ public FileStoreCommit withMetrics(CommitMetrics metrics) {
+ this.commitMetrics = metrics;
+ return this;
+ }
+
private void collectChanges(
List<CommitMessage> commitMessages,
List<ManifestEntry> appendTableFiles,
@@ -481,7 +555,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
kind, commitMessage.partition(), commitMessage.bucket(),
numBucket, file);
}
- private void tryCommit(
+ private int tryCommit(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
@@ -490,8 +564,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long safeLatestSnapshotId) {
+ int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ cnt++;
if (tryCommitOnce(
tableFiles,
changelogFiles,
@@ -505,18 +581,21 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
break;
}
}
+ return cnt;
}
- private void tryOverwrite(
+ private int tryOverwrite(
Predicate partitionFilter,
List<ManifestEntry> changes,
List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
+ int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ cnt++;
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new
ArrayList<>();
if (latestSnapshot != null) {
@@ -565,6 +644,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
break;
}
}
+ return cnt;
}
@VisibleForTesting
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 916fe17f4..875e2fed7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -261,7 +261,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
- coreOptions().snapshotExpireExecutionMode());
+ coreOptions().snapshotExpireExecutionMode(),
+ name());
}
private List<CommitCallback> createCommitCallbacks() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 7293dce3a..19e95bb0c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
@@ -75,6 +76,7 @@ public class TableCommitImpl implements InnerTableCommit {
private ExecutorService expireMainExecutor;
private AtomicReference<Throwable> expireError;
+ private final CommitMetrics commitMetrics;
public TableCommitImpl(
FileStoreCommit commit,
@@ -85,8 +87,10 @@ public class TableCommitImpl implements InnerTableCommit {
Lock lock,
@Nullable Duration consumerExpireTime,
ConsumerManager consumerManager,
- ExpireExecutionMode expireExecutionMode) {
- commit.withLock(lock);
+ ExpireExecutionMode expireExecutionMode,
+ String tableName) {
+ this.commitMetrics = new CommitMetrics(tableName);
+ commit.withLock(lock).withMetrics(commitMetrics);
if (expire != null) {
expire.withLock(lock);
}
@@ -261,6 +265,7 @@ public class TableCommitImpl implements InnerTableCommit {
}
IOUtils.closeQuietly(lock);
expireMainExecutor.shutdownNow();
+ commitMetrics.close();
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e66a6eccb..ac0c0a64e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -39,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
/** base class for Test {@link ManifestFile}. */
@@ -221,4 +222,26 @@ public abstract class ManifestFileMetaTestBase {
makeEntry(false, "B2", partition2),
makeEntry(true, "D2", partition2)));
}
+
+ public static ManifestEntry makeEntry(
+ FileKind fileKind, int partition, int bucket, long rowCount) {
+ return new ManifestEntry(
+ fileKind,
+ row(partition),
+ bucket,
+ 0, // not used
+ new DataFileMeta(
+ "", // not used
+ 0, // not used
+ rowCount,
+ null, // not used
+ null, // not used
+ StatsTestUtils.newEmptyTableStats(), // not used
+ StatsTestUtils.newEmptyTableStats(), // not used
+ 0, // not used
+ 0, // not used
+ 0, // not used
+ 0 // not used
+ ));
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
new file mode 100644
index 000000000..dead22c2e
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.Metrics;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.offset;
+
+/** Tests for {@link CommitMetrics}. */
+public class CommitMetricsTest {
+ private static final String TABLE_NAME = "myTable";
+
+ private CommitMetrics commitMetrics;
+
+ @BeforeEach
+ public void beforeEach() {
+ commitMetrics = getCommitMetrics();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ commitMetrics.close();
+ }
+
+ /** Tests the registration of the commit metrics. */
+ @Test
+ public void testGenericMetricsRegistration() {
+ MetricGroup genericMetricGroup = commitMetrics.getMetricGroup();
+ assertThat(Metrics.getInstance().getMetricGroups().size())
+ .withFailMessage(
+ String.format(
+ "Please close the created metric groups %s in
case of metrics resource leak.",
+ Metrics.groupsInfo()))
+ .isEqualTo(1);
+
assertThat(genericMetricGroup.getGroupName()).isEqualTo(CommitMetrics.GROUP_NAME);
+ Map<String, Metric> registeredMetrics =
genericMetricGroup.getMetrics();
+ assertThat(registeredMetrics.keySet())
+ .containsExactlyInAnyOrder(
+ CommitMetrics.LAST_COMMIT_DURATION,
+ CommitMetrics.LAST_COMMIT_ATTEMPTS,
+ CommitMetrics.LAST_GENERATED_SNAPSHOTS,
+ CommitMetrics.LAST_PARTITIONS_WRITTEN,
+ CommitMetrics.LAST_BUCKETS_WRITTEN,
+ CommitMetrics.COMMIT_DURATION,
+ CommitMetrics.LAST_TABLE_FILES_ADDED,
+ CommitMetrics.LAST_TABLE_FILES_DELETED,
+ CommitMetrics.LAST_TABLE_FILES_APPENDED,
+ CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED,
+ CommitMetrics.LAST_CHANGELOG_FILES_APPENDED,
+ CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
+ CommitMetrics.LAST_DELTA_RECORDS_APPENDED,
+ CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED,
+ CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED,
+ CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);
+
+ reportOnce(commitMetrics);
+
assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1);
+ }
+
+ /** Tests that the metrics are updated properly. */
+ @Test
+ public void testMetricsAreUpdated() {
+ Map<String, Metric> registeredGenericMetrics =
commitMetrics.getMetricGroup().getMetrics();
+
+ // Check initial values
+ Gauge<Long> lastCommitDuration =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_COMMIT_DURATION);
+ Histogram commitDuration =
+ (Histogram)
registeredGenericMetrics.get(CommitMetrics.COMMIT_DURATION);
+ Gauge<Long> lastCommitAttempts =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_COMMIT_ATTEMPTS);
+ Gauge<Long> lastGeneratedSnapshots =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_GENERATED_SNAPSHOTS);
+ Gauge<Long> lastPartitionsWritten =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_PARTITIONS_WRITTEN);
+ Gauge<Long> lastBucketsWritten =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_BUCKETS_WRITTEN);
+ Gauge<Long> lastTableFilesAdded =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_ADDED);
+ Gauge<Long> lastTableFilesDeleted =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_DELETED);
+
+ Gauge<Long> lastTableFilesAppended =
+ (Gauge<Long>)
registeredGenericMetrics.get(CommitMetrics.LAST_TABLE_FILES_APPENDED);
+
+ Gauge<Long> lastTableFilesCompacted =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED);
+
+ Gauge<Long> lastChangelogFilesAppended =
+ (Gauge<Long>)
+
registeredGenericMetrics.get(CommitMetrics.LAST_CHANGELOG_FILES_APPENDED);
+
+ Gauge<Long> lastChangelogFilesCompacted =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED);
+
+ Gauge<Long> lastDeltaRecordsAppended =
+ (Gauge<Long>)
+
registeredGenericMetrics.get(CommitMetrics.LAST_DELTA_RECORDS_APPENDED);
+
+ Gauge<Long> lastChangelogRecordsAppended =
+ (Gauge<Long>)
+
registeredGenericMetrics.get(CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED);
+
+ Gauge<Long> lastDeltaRecordsCompacted =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED);
+
+ Gauge<Long> lastChangelogRecordsCompacted =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);
+
+ assertThat(lastCommitDuration.getValue()).isEqualTo(0);
+ assertThat(commitDuration.getCount()).isEqualTo(0);
+ assertThat(commitDuration.getStatistics().size()).isEqualTo(0);
+ assertThat(lastCommitAttempts.getValue()).isEqualTo(0);
+ assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(0);
+ assertThat(lastPartitionsWritten.getValue()).isEqualTo(0);
+ assertThat(lastBucketsWritten.getValue()).isEqualTo(0);
+ assertThat(lastTableFilesAdded.getValue()).isEqualTo(0);
+ assertThat(lastTableFilesDeleted.getValue()).isEqualTo(0);
+ assertThat(lastTableFilesAppended.getValue()).isEqualTo(0);
+ assertThat(lastTableFilesCompacted.getValue()).isEqualTo(0);
+ assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(0);
+ assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(0);
+ assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(0);
+ assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(0);
+ assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(0);
+ assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(0);
+
+ // report once
+ reportOnce(commitMetrics);
+
+ // generic metrics value updated
+ assertThat(lastCommitDuration.getValue()).isEqualTo(200);
+ assertThat(commitDuration.getCount()).isEqualTo(1);
+ assertThat(commitDuration.getStatistics().size()).isEqualTo(1);
+
assertThat(commitDuration.getStatistics().getValues()[0]).isEqualTo(200L);
+ assertThat(commitDuration.getStatistics().getMin()).isEqualTo(200);
+
assertThat(commitDuration.getStatistics().getQuantile(0.5)).isCloseTo(200.0,
offset(0.001));
+ assertThat(commitDuration.getStatistics().getMean()).isEqualTo(200);
+ assertThat(commitDuration.getStatistics().getMax()).isEqualTo(200);
+ assertThat(commitDuration.getStatistics().getStdDev()).isEqualTo(0);
+ assertThat(lastCommitAttempts.getValue()).isEqualTo(1);
+ assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(2);
+ assertThat(lastPartitionsWritten.getValue()).isEqualTo(3);
+ assertThat(lastBucketsWritten.getValue()).isEqualTo(3);
+ assertThat(lastTableFilesAdded.getValue()).isEqualTo(4);
+ assertThat(lastTableFilesDeleted.getValue()).isEqualTo(1);
+ assertThat(lastTableFilesAppended.getValue()).isEqualTo(2);
+ assertThat(lastTableFilesCompacted.getValue()).isEqualTo(3);
+ assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(2);
+ assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+ assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(503);
+ assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(503);
+ assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(613);
+ assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(512);
+
+ // report again
+ reportAgain(commitMetrics);
+
+ // generic metrics value updated
+ assertThat(lastCommitDuration.getValue()).isEqualTo(500);
+ assertThat(commitDuration.getCount()).isEqualTo(2);
+ assertThat(commitDuration.getStatistics().size()).isEqualTo(2);
+
assertThat(commitDuration.getStatistics().getValues()[1]).isEqualTo(500L);
+ assertThat(commitDuration.getStatistics().getMin()).isEqualTo(200);
+
assertThat(commitDuration.getStatistics().getQuantile(0.5)).isCloseTo(350.0,
offset(0.001));
+ assertThat(commitDuration.getStatistics().getMean()).isEqualTo(350);
+ assertThat(commitDuration.getStatistics().getMax()).isEqualTo(500);
+
assertThat(commitDuration.getStatistics().getStdDev()).isCloseTo(212.132,
offset(0.001));
+ assertThat(lastCommitAttempts.getValue()).isEqualTo(2);
+ assertThat(lastGeneratedSnapshots.getValue()).isEqualTo(1);
+ assertThat(lastPartitionsWritten.getValue()).isEqualTo(2);
+ assertThat(lastBucketsWritten.getValue()).isEqualTo(3);
+ assertThat(lastTableFilesAdded.getValue()).isEqualTo(4);
+ assertThat(lastTableFilesDeleted.getValue()).isEqualTo(1);
+ assertThat(lastTableFilesAppended.getValue()).isEqualTo(2);
+ assertThat(lastTableFilesCompacted.getValue()).isEqualTo(3);
+ assertThat(lastChangelogFilesAppended.getValue()).isEqualTo(2);
+ assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+ assertThat(lastDeltaRecordsAppended.getValue()).isEqualTo(805);
+ assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(213);
+ assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(506);
+ assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(601);
+ }
+
+ private void reportOnce(CommitMetrics commitMetrics) {
+ List<ManifestEntry> appendTableFiles = new ArrayList<>();
+ List<ManifestEntry> appendChangelogFiles = new ArrayList<>();
+ List<ManifestEntry> compactTableFiles = new ArrayList<>();
+ List<ManifestEntry> compactChangelogFiles = new ArrayList<>();
+
+ appendTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 201));
+ appendTableFiles.add(makeEntry(FileKind.ADD, 2, 3, 302));
+ appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 202));
+ appendChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 301));
+ compactTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 203));
+ compactTableFiles.add(makeEntry(FileKind.ADD, 2, 3, 304));
+ compactTableFiles.add(makeEntry(FileKind.DELETE, 3, 5, 106));
+ compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 205));
+ compactChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 307));
+
+ CommitStats commitStats =
+ new CommitStats(
+ appendTableFiles,
+ appendChangelogFiles,
+ compactTableFiles,
+ compactChangelogFiles,
+ 200,
+ 2,
+ 1);
+
+ commitMetrics.reportCommit(commitStats);
+ }
+
+ private void reportAgain(CommitMetrics commitMetrics) {
+ List<ManifestEntry> appendTableFiles = new ArrayList<>();
+ List<ManifestEntry> appendChangelogFiles = new ArrayList<>();
+ List<ManifestEntry> compactTableFiles = new ArrayList<>();
+ List<ManifestEntry> compactChangelogFiles = new ArrayList<>();
+
+ appendTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 400));
+ appendTableFiles.add(makeEntry(FileKind.ADD, 3, 4, 405));
+ appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 102));
+ appendChangelogFiles.add(makeEntry(FileKind.ADD, 3, 4, 111));
+ compactTableFiles.add(makeEntry(FileKind.ADD, 1, 1, 200));
+ compactTableFiles.add(makeEntry(FileKind.ADD, 3, 4, 201));
+ compactTableFiles.add(makeEntry(FileKind.DELETE, 3, 5, 105));
+ compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 300));
+ compactChangelogFiles.add(makeEntry(FileKind.ADD, 3, 4, 301));
+
+ CommitStats commitStats =
+ new CommitStats(
+ appendTableFiles,
+ appendChangelogFiles,
+ compactTableFiles,
+ compactChangelogFiles,
+ 500,
+ 1,
+ 2);
+
+ commitMetrics.reportCommit(commitStats);
+ }
+
+ private CommitMetrics getCommitMetrics() {
+ return new CommitMetrics(TABLE_NAME);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
new file mode 100644
index 000000000..640dba08f
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics.commit;
+
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CommitStats}. */
+public class CommitStatsTest {
+ private static List<ManifestEntry> files = new ArrayList<>();
+ private static List<ManifestEntry> appendDataFiles = new ArrayList<>();
+ private static List<ManifestEntry> appendChangelogFiles = new
ArrayList<>();
+ private static List<ManifestEntry> compactDataFiles = new ArrayList<>();
+ private static List<ManifestEntry> compactChangelogFiles = new
ArrayList<>();
+
+ @BeforeAll
+ public static void beforeAll() {
+ appendDataFiles.add(makeEntry(FileKind.ADD, 1, 1, 201));
+ appendDataFiles.add(makeEntry(FileKind.ADD, 2, 3, 302));
+ appendChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 202));
+ appendChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 301));
+ compactDataFiles.add(makeEntry(FileKind.ADD, 1, 1, 203));
+ compactDataFiles.add(makeEntry(FileKind.ADD, 2, 3, 304));
+ compactDataFiles.add(makeEntry(FileKind.DELETE, 3, 5, 106));
+ compactChangelogFiles.add(makeEntry(FileKind.ADD, 1, 1, 205));
+ compactChangelogFiles.add(makeEntry(FileKind.ADD, 2, 3, 307));
+ files.addAll(appendDataFiles);
+ files.addAll(appendChangelogFiles);
+ files.addAll(compactDataFiles);
+ files.addAll(compactChangelogFiles);
+ }
+
+ @Test
+ public void testCalcChangedPartitionsAndBuckets() {
+ assertThat(CommitStats.numChangedBuckets(files)).isEqualTo(3);
+ assertThat(CommitStats.numChangedPartitions(files)).isEqualTo(3);
+
assertThat(CommitStats.changedPartBuckets(files).get(row(1))).containsExactly(1);
+
assertThat(CommitStats.changedPartBuckets(files).get(row(2))).containsExactly(3);
+
assertThat(CommitStats.changedPartBuckets(files).get(row(3))).containsExactly(5);
+ assertThat(CommitStats.changedPartitions(files))
+ .containsExactlyInAnyOrder(row(1), row(2), row(3));
+ }
+
+ @Test
+ public void testFailedAppendSnapshot() {
+ CommitStats commitStats =
+ new CommitStats(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ 0,
+ 0,
+ 1);
+ assertThat(commitStats.getTableFilesAdded()).isEqualTo(0);
+ assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
+ assertThat(commitStats.getTableFilesAppended()).isEqualTo(0);
+ assertThat(commitStats.getTableFilesCompacted()).isEqualTo(0);
+ assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(0);
+ assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(0);
+ assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(0);
+ assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(0);
+ assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(0);
+ assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(0);
+ assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(0);
+ assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(0);
+ assertThat(commitStats.getNumBucketsWritten()).isEqualTo(0);
+ assertThat(commitStats.getDuration()).isEqualTo(0);
+ assertThat(commitStats.getAttempts()).isEqualTo(1);
+ }
+
+ @Test
+ public void testFailedCompactSnapshot() {
+ CommitStats commitStats =
+ new CommitStats(
+ appendDataFiles,
+ appendChangelogFiles,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ 3000,
+ 1,
+ 2);
+ assertThat(commitStats.getTableFilesAdded()).isEqualTo(2);
+ assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
+ assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
+ assertThat(commitStats.getTableFilesCompacted()).isEqualTo(0);
+ assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(2);
+ assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(0);
+ assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(1);
+ assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(503);
+ assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(503);
+ assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(0);
+ assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(0);
+ assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(2);
+ assertThat(commitStats.getNumBucketsWritten()).isEqualTo(2);
+ assertThat(commitStats.getDuration()).isEqualTo(3000);
+ assertThat(commitStats.getAttempts()).isEqualTo(2);
+ }
+
+ @Test
+ public void testSucceedAllSnapshot() {
+ CommitStats commitStats =
+ new CommitStats(
+ appendDataFiles,
+ appendChangelogFiles,
+ compactDataFiles,
+ compactChangelogFiles,
+ 3000,
+ 2,
+ 2);
+ assertThat(commitStats.getTableFilesAdded()).isEqualTo(4);
+ assertThat(commitStats.getTableFilesDeleted()).isEqualTo(1);
+ assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
+ assertThat(commitStats.getTableFilesCompacted()).isEqualTo(3);
+ assertThat(commitStats.getChangelogFilesAppended()).isEqualTo(2);
+ assertThat(commitStats.getChangelogFilesCompacted()).isEqualTo(2);
+ assertThat(commitStats.getGeneratedSnapshots()).isEqualTo(2);
+ assertThat(commitStats.getDeltaRecordsAppended()).isEqualTo(503);
+ assertThat(commitStats.getChangelogRecordsAppended()).isEqualTo(503);
+ assertThat(commitStats.getDeltaRecordsCompacted()).isEqualTo(613);
+ assertThat(commitStats.getChangelogRecordsCompacted()).isEqualTo(512);
+ assertThat(commitStats.getNumPartitionsWritten()).isEqualTo(3);
+ assertThat(commitStats.getNumBucketsWritten()).isEqualTo(3);
+ assertThat(commitStats.getDuration()).isEqualTo(3000);
+ assertThat(commitStats.getAttempts()).isEqualTo(2);
+ }
+}