This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 73134594aa [metrics] Add total file size metrics of active buckets for 
primary key tables (#5158)
73134594aa is described below

commit 73134594aa3b11c8d9f339ee99452767fde055c0
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 27 16:47:51 2025 +0800

    [metrics] Add total file size metrics of active buckets for primary key 
tables (#5158)
    
    This closes #5158.
---
 docs/content/maintenance/metrics.md                | 10 +++
 .../java/org/apache/paimon/mergetree/Levels.java   |  5 ++
 .../mergetree/compact/MergeTreeCompactManager.java |  9 ++-
 .../paimon/operation/AbstractFileStoreWrite.java   |  5 ++
 .../operation/metrics/CompactionMetrics.java       | 19 +++++
 .../operation/metrics/CompactionMetricsTest.java   | 94 ++++++++++++++++++++++
 6 files changed, 138 insertions(+), 4 deletions(-)

diff --git a/docs/content/maintenance/metrics.md 
b/docs/content/maintenance/metrics.md
index 2c3067267f..80c82268a1 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -279,6 +279,16 @@ Below is lists of Paimon built-in metrics. They are 
summarized into types of sca
             <td>Gauge</td>
             <td>The average output file size for this task's compaction.</td>
         </tr>
+        <tr>
+            <td>maxTotalFileSize</td>
+            <td>Gauge</td>
+            <td>The maximum total file size of an active (currently being 
written) bucket.</td>
+        </tr>
+        <tr>
+            <td>avgTotalFileSize</td>
+            <td>Gauge</td>
+            <td>The average total file size of all active (currently being 
written) buckets.</td>
+        </tr>
     </tbody>
 </table>
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
index 350b693dbf..ea46c841ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
@@ -133,6 +133,11 @@ public class Levels {
         return level0.isEmpty() ? -1 : 0;
     }
 
+    public long totalFileSize() {
+        return level0.stream().mapToLong(DataFileMeta::fileSize).sum()
+                + levels.stream().mapToLong(SortedRun::totalSize).sum();
+    }
+
     public List<DataFileMeta> allFiles() {
         List<DataFileMeta> files = new ArrayList<>();
         List<LevelSortedRun> runs = levelSortedRuns();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 8ae2920130..15629f2e66 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -86,7 +86,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.dvMaintainer = dvMaintainer;
         this.lazyGenDeletionFile = lazyGenDeletionFile;
 
-        MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
+        MetricUtils.safeCall(this::reportMetrics, LOG);
     }
 
     @Override
@@ -103,7 +103,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     @Override
     public void addNewFile(DataFileMeta file) {
         levels.addLevel0File(file);
-        MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
+        MetricUtils.safeCall(this::reportMetrics, LOG);
     }
 
     @Override
@@ -230,7 +230,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                                 r.after());
                     }
                     levels.update(r.before(), r.after());
-                    MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
+                    MetricUtils.safeCall(this::reportMetrics, LOG);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(
                                 "Levels in compact manager updated. Current 
runs are\n{}",
@@ -240,9 +240,10 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         return result;
     }
 
-    private void reportLevel0FileCount() {
+    private void reportMetrics() {
         if (metricsReporter != null) {
             metricsReporter.reportLevel0FileCount(levels.level0().size());
+            metricsReporter.reportTotalFileSize(levels.totalFileSize());
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 14dfe75a6e..9b7c1a8fba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -555,4 +555,9 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
         return writers;
     }
+
+    @VisibleForTesting
+    public CompactionMetrics compactionMetrics() {
+        return compactionMetrics;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index a3074daebb..57d8e2bac0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -48,6 +48,9 @@ public class CompactionMetrics {
     public static final String MAX_COMPACTION_OUTPUT_SIZE = 
"maxCompactionOutputSize";
     public static final String AVG_COMPACTION_INPUT_SIZE = 
"avgCompactionInputSize";
     public static final String AVG_COMPACTION_OUTPUT_SIZE = 
"avgCompactionOutputSize";
+    public static final String MAX_TOTAL_FILE_SIZE = "maxTotalFileSize";
+    public static final String AVG_TOTAL_FILE_SIZE = "avgTotalFileSize";
+
     private static final long BUSY_MEASURE_MILLIS = 60_000;
     private static final int COMPACTION_TIME_WINDOW = 100;
 
@@ -93,6 +96,9 @@ public class CompactionMetrics {
 
         compactionsCompletedCounter = 
metricGroup.counter(COMPACTION_COMPLETED_COUNT);
         compactionsQueuedCounter = 
metricGroup.counter(COMPACTION_QUEUED_COUNT);
+
+        metricGroup.gauge(MAX_TOTAL_FILE_SIZE, () -> 
getTotalFileSizeStream().max().orElse(-1));
+        metricGroup.gauge(AVG_TOTAL_FILE_SIZE, () -> 
getTotalFileSizeStream().average().orElse(-1));
     }
 
     private LongStream getLevel0FileCountStream() {
@@ -116,6 +122,11 @@ public class CompactionMetrics {
         return compactionTimes.stream().mapToDouble(Long::doubleValue);
     }
 
+    @VisibleForTesting
+    public LongStream getTotalFileSizeStream() {
+        return reporters.values().stream().mapToLong(r -> r.totalFileSize);
+    }
+
     public void close() {
         metricGroup.close();
     }
@@ -139,6 +150,8 @@ public class CompactionMetrics {
 
         void reportCompactionOutputSize(long bytes);
 
+        void reportTotalFileSize(long bytes);
+
         void unregister();
     }
 
@@ -148,6 +161,7 @@ public class CompactionMetrics {
         private long level0FileCount;
         private long compactionInputSize = 0;
         private long compactionOutputSize = 0;
+        private long totalFileSize = 0;
 
         private ReporterImpl(PartitionAndBucket key) {
             this.key = key;
@@ -181,6 +195,11 @@ public class CompactionMetrics {
             this.compactionOutputSize = bytes;
         }
 
+        @Override
+        public void reportTotalFileSize(long bytes) {
+            this.totalFileSize = bytes;
+        }
+
         @Override
         public void reportLevel0FileCount(long count) {
             this.level0FileCount = count;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
index d2f5e8963d..ad7c9160c6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
@@ -18,19 +18,45 @@
 
 package org.apache.paimon.operation.metrics;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.metrics.Counter;
 import org.apache.paimon.metrics.Gauge;
 import org.apache.paimon.metrics.Metric;
 import org.apache.paimon.metrics.TestMetricRegistry;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link CompactionMetrics}. */
 public class CompactionMetricsTest {
 
+    @TempDir java.nio.file.Path tempDir;
+
     @Test
     public void testReportMetrics() {
         CompactionMetrics metrics = new CompactionMetrics(new 
TestMetricRegistry(), "myTable");
@@ -82,6 +108,74 @@ public class CompactionMetricsTest {
         assertThat(getMetric(metrics, 
CompactionMetrics.COMPACTION_QUEUED_COUNT)).isEqualTo(1L);
     }
 
+    @Test
+    public void testTotalFileSizeForPrimaryKeyTables() throws Exception {
+        LocalFileIO fileIO = LocalFileIO.create();
+        Path path = new Path(tempDir.toString());
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+
+        int bucketNum = 2;
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, bucketNum);
+        Schema schema =
+                new Schema(
+                        rowType.getFields(),
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        options.toMap(),
+                        "");
+
+        FileStoreTable table;
+        try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, 
path)) {
+            paimonCatalog.createDatabase("mydb", false);
+            Identifier paimonIdentifier = Identifier.create("mydb", "mytable");
+            paimonCatalog.createTable(paimonIdentifier, schema, false);
+            table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+        }
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+        write.withMetricRegistry(new TestMetricRegistry());
+
+        int numKeys = 300;
+        int numRounds = 10;
+        int recordsPerRound = 100;
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int commitIdentifier = 0;
+        for (int i = 0; i < numRounds; i++) {
+            for (int j = 0; j < recordsPerRound; j++) {
+                write.write(GenericRow.of(random.nextInt(numKeys), 
random.nextInt()));
+            }
+
+            commitIdentifier++;
+            commit.commit(
+                    commitIdentifier, 
write.prepareCommit(random.nextBoolean(), commitIdentifier));
+
+            long[] totalFileSizes = new long[bucketNum];
+            for (Split split : table.newScan().plan().splits()) {
+                DataSplit dataSplit = (DataSplit) split;
+                totalFileSizes[dataSplit.bucket()] +=
+                        
dataSplit.dataFiles().stream().mapToLong(DataFileMeta::fileSize).sum();
+            }
+
+            CompactionMetrics metrics =
+                    ((AbstractFileStoreWrite<?>) 
write.getWrite()).compactionMetrics();
+            assertThat(metrics.getTotalFileSizeStream()).hasSize(bucketNum);
+            assertThat(getMetric(metrics, 
CompactionMetrics.MAX_TOTAL_FILE_SIZE))
+                    .isEqualTo(Arrays.stream(totalFileSizes).max().orElse(0));
+            assertThat(getMetric(metrics, 
CompactionMetrics.AVG_TOTAL_FILE_SIZE))
+                    
.isEqualTo(Arrays.stream(totalFileSizes).average().orElse(0));
+        }
+
+        write.close();
+        commit.close();
+    }
+
     private Object getMetric(CompactionMetrics metrics, String metricName) {
         Metric metric = metrics.getMetricGroup().getMetrics().get(metricName);
         if (metric instanceof Gauge) {

Reply via email to