This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 73134594aa [metrics] Add total file size metrics of active buckets for
primary key tables (#5158)
73134594aa is described below
commit 73134594aa3b11c8d9f339ee99452767fde055c0
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 27 16:47:51 2025 +0800
[metrics] Add total file size metrics of active buckets for primary key
tables (#5158)
This closes #5158.
---
docs/content/maintenance/metrics.md | 10 +++
.../java/org/apache/paimon/mergetree/Levels.java | 5 ++
.../mergetree/compact/MergeTreeCompactManager.java | 9 ++-
.../paimon/operation/AbstractFileStoreWrite.java | 5 ++
.../operation/metrics/CompactionMetrics.java | 19 +++++
.../operation/metrics/CompactionMetricsTest.java | 94 ++++++++++++++++++++++
6 files changed, 138 insertions(+), 4 deletions(-)
diff --git a/docs/content/maintenance/metrics.md
b/docs/content/maintenance/metrics.md
index 2c3067267f..80c82268a1 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -279,6 +279,16 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<td>Gauge</td>
<td>The average output file size for this task's compaction.</td>
</tr>
+ <tr>
+ <td>maxTotalFileSize</td>
+ <td>Gauge</td>
+ <td>The maximum total file size of an active (currently being
written) bucket.</td>
+ </tr>
+ <tr>
+ <td>avgTotalFileSize</td>
+ <td>Gauge</td>
+ <td>The average total file size of all active (currently being
written) buckets.</td>
+ </tr>
</tbody>
</table>
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
index 350b693dbf..ea46c841ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
@@ -133,6 +133,11 @@ public class Levels {
return level0.isEmpty() ? -1 : 0;
}
+ public long totalFileSize() {
+ return level0.stream().mapToLong(DataFileMeta::fileSize).sum()
+ + levels.stream().mapToLong(SortedRun::totalSize).sum();
+ }
+
public List<DataFileMeta> allFiles() {
List<DataFileMeta> files = new ArrayList<>();
List<LevelSortedRun> runs = levelSortedRuns();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 8ae2920130..15629f2e66 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -86,7 +86,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
this.dvMaintainer = dvMaintainer;
this.lazyGenDeletionFile = lazyGenDeletionFile;
- MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
+ MetricUtils.safeCall(this::reportMetrics, LOG);
}
@Override
@@ -103,7 +103,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Override
public void addNewFile(DataFileMeta file) {
levels.addLevel0File(file);
- MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
+ MetricUtils.safeCall(this::reportMetrics, LOG);
}
@Override
@@ -230,7 +230,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
r.after());
}
levels.update(r.before(), r.after());
- MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
+ MetricUtils.safeCall(this::reportMetrics, LOG);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Levels in compact manager updated. Current
runs are\n{}",
@@ -240,9 +240,10 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
return result;
}
- private void reportLevel0FileCount() {
+ private void reportMetrics() {
if (metricsReporter != null) {
metricsReporter.reportLevel0FileCount(levels.level0().size());
+ metricsReporter.reportTotalFileSize(levels.totalFileSize());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 14dfe75a6e..9b7c1a8fba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -555,4 +555,9 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
return writers;
}
+
+ @VisibleForTesting
+ public CompactionMetrics compactionMetrics() {
+ return compactionMetrics;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index a3074daebb..57d8e2bac0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -48,6 +48,9 @@ public class CompactionMetrics {
public static final String MAX_COMPACTION_OUTPUT_SIZE =
"maxCompactionOutputSize";
public static final String AVG_COMPACTION_INPUT_SIZE =
"avgCompactionInputSize";
public static final String AVG_COMPACTION_OUTPUT_SIZE =
"avgCompactionOutputSize";
+ public static final String MAX_TOTAL_FILE_SIZE = "maxTotalFileSize";
+ public static final String AVG_TOTAL_FILE_SIZE = "avgTotalFileSize";
+
private static final long BUSY_MEASURE_MILLIS = 60_000;
private static final int COMPACTION_TIME_WINDOW = 100;
@@ -93,6 +96,9 @@ public class CompactionMetrics {
compactionsCompletedCounter =
metricGroup.counter(COMPACTION_COMPLETED_COUNT);
compactionsQueuedCounter =
metricGroup.counter(COMPACTION_QUEUED_COUNT);
+
+ metricGroup.gauge(MAX_TOTAL_FILE_SIZE, () ->
getTotalFileSizeStream().max().orElse(-1));
+ metricGroup.gauge(AVG_TOTAL_FILE_SIZE, () ->
getTotalFileSizeStream().average().orElse(-1));
}
private LongStream getLevel0FileCountStream() {
@@ -116,6 +122,11 @@ public class CompactionMetrics {
return compactionTimes.stream().mapToDouble(Long::doubleValue);
}
+ @VisibleForTesting
+ public LongStream getTotalFileSizeStream() {
+ return reporters.values().stream().mapToLong(r -> r.totalFileSize);
+ }
+
public void close() {
metricGroup.close();
}
@@ -139,6 +150,8 @@ public class CompactionMetrics {
void reportCompactionOutputSize(long bytes);
+ void reportTotalFileSize(long bytes);
+
void unregister();
}
@@ -148,6 +161,7 @@ public class CompactionMetrics {
private long level0FileCount;
private long compactionInputSize = 0;
private long compactionOutputSize = 0;
+ private long totalFileSize = 0;
private ReporterImpl(PartitionAndBucket key) {
this.key = key;
@@ -181,6 +195,11 @@ public class CompactionMetrics {
this.compactionOutputSize = bytes;
}
+ @Override
+ public void reportTotalFileSize(long bytes) {
+ this.totalFileSize = bytes;
+ }
+
@Override
public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
index d2f5e8963d..ad7c9160c6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
@@ -18,19 +18,45 @@
package org.apache.paimon.operation.metrics;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.metrics.Counter;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.TestMetricRegistry;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link CompactionMetrics}. */
public class CompactionMetricsTest {
+ @TempDir java.nio.file.Path tempDir;
+
@Test
public void testReportMetrics() {
CompactionMetrics metrics = new CompactionMetrics(new
TestMetricRegistry(), "myTable");
@@ -82,6 +108,74 @@ public class CompactionMetricsTest {
assertThat(getMetric(metrics,
CompactionMetrics.COMPACTION_QUEUED_COUNT)).isEqualTo(1L);
}
+ @Test
+ public void testTotalFileSizeForPrimaryKeyTables() throws Exception {
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path path = new Path(tempDir.toString());
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+
+ int bucketNum = 2;
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, bucketNum);
+ Schema schema =
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options.toMap(),
+ "");
+
+ FileStoreTable table;
+ try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO,
path)) {
+ paimonCatalog.createDatabase("mydb", false);
+ Identifier paimonIdentifier = Identifier.create("mydb", "mytable");
+ paimonCatalog.createTable(paimonIdentifier, schema, false);
+ table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+ }
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.withMetricRegistry(new TestMetricRegistry());
+
+ int numKeys = 300;
+ int numRounds = 10;
+ int recordsPerRound = 100;
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int commitIdentifier = 0;
+ for (int i = 0; i < numRounds; i++) {
+ for (int j = 0; j < recordsPerRound; j++) {
+ write.write(GenericRow.of(random.nextInt(numKeys),
random.nextInt()));
+ }
+
+ commitIdentifier++;
+ commit.commit(
+ commitIdentifier,
write.prepareCommit(random.nextBoolean(), commitIdentifier));
+
+ long[] totalFileSizes = new long[bucketNum];
+ for (Split split : table.newScan().plan().splits()) {
+ DataSplit dataSplit = (DataSplit) split;
+ totalFileSizes[dataSplit.bucket()] +=
+
dataSplit.dataFiles().stream().mapToLong(DataFileMeta::fileSize).sum();
+ }
+
+ CompactionMetrics metrics =
+ ((AbstractFileStoreWrite<?>)
write.getWrite()).compactionMetrics();
+ assertThat(metrics.getTotalFileSizeStream()).hasSize(bucketNum);
+ assertThat(getMetric(metrics,
CompactionMetrics.MAX_TOTAL_FILE_SIZE))
+ .isEqualTo(Arrays.stream(totalFileSizes).max().orElse(0));
+ assertThat(getMetric(metrics,
CompactionMetrics.AVG_TOTAL_FILE_SIZE))
+
.isEqualTo(Arrays.stream(totalFileSizes).average().orElse(0));
+ }
+
+ write.close();
+ commit.close();
+ }
+
private Object getMetric(CompactionMetrics metrics, String metricName) {
Metric metric = metrics.getMetricGroup().getMetrics().get(metricName);
if (metric instanceof Gauge) {