This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1d5245c1af [rest] Add commit statistics when commit to REST Server
(#5152)
1d5245c1af is described below
commit 1d5245c1af57aee371092a8e66ce0d469eba61f7
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Feb 25 19:20:44 2025 +0800
[rest] Add commit statistics when commit to REST Server (#5152)
---
.../org/apache/paimon/catalog/CatalogUtils.java | 8 +--
.../paimon/catalog/RenamingSnapshotCommit.java | 5 +-
.../org/apache/paimon/catalog/SnapshotCommit.java | 4 +-
.../org/apache/paimon/manifest/PartitionEntry.java | 11 ++++
.../paimon/operation/FileStoreCommitImpl.java | 61 +++++++++++++---------
.../org/apache/paimon/partition/Partition.java | 5 +-
.../java/org/apache/paimon/rest/RESTCatalog.java | 5 +-
.../paimon/rest/RESTSnapshotCommitFactory.java | 7 ++-
.../paimon/rest/requests/CommitTableRequest.java | 16 +++++-
.../org/apache/paimon/rest/RESTCatalogServer.java | 4 +-
paimon-open-api/rest-catalog-open-api.yaml | 4 ++
11 files changed, 89 insertions(+), 41 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index de7ec83755..45c50e2bb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -147,13 +147,7 @@ public class CatalogUtils {
table.newReadBuilder().newScan().listPartitionEntries();
List<Partition> partitions = new ArrayList<>(partitionEntries.size());
for (PartitionEntry entry : partitionEntries) {
- partitions.add(
- new Partition(
- computer.generatePartValues(entry.partition()),
- entry.recordCount(),
- entry.fileSizeInBytes(),
- entry.fileCount(),
- entry.lastFileCreationTime()));
+ partitions.add(entry.toPartition(computer));
}
return partitions;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
index cc7e4d06b5..4525838c67 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
@@ -23,10 +23,12 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
@@ -49,7 +51,8 @@ public class RenamingSnapshotCommit implements SnapshotCommit
{
}
@Override
- public boolean commit(Snapshot snapshot, String branch) throws Exception {
+ public boolean commit(Snapshot snapshot, String branch, List<Partition>
statistics)
+ throws Exception {
Path newSnapshotPath =
snapshotManager.branch().equals(branch)
? snapshotManager.snapshotPath(snapshot.id())
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java
index 1f472464c8..984556735e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java
@@ -19,14 +19,16 @@
package org.apache.paimon.catalog;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.utils.SnapshotManager;
import java.io.Serializable;
+import java.util.List;
/** Interface to commit snapshot atomically. */
public interface SnapshotCommit extends AutoCloseable {
- boolean commit(Snapshot snapshot, String branch) throws Exception;
+ boolean commit(Snapshot snapshot, String branch, List<Partition>
statistics) throws Exception;
/** Factory to create {@link SnapshotCommit}. */
interface Factory extends Serializable {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index 1aa562444d..92c03f2c15 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -21,7 +21,9 @@ package org.apache.paimon.manifest;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import java.util.Collection;
import java.util.HashMap;
@@ -83,6 +85,15 @@ public class PartitionEntry {
Math.max(lastFileCreationTime, entry.lastFileCreationTime));
}
+ public Partition toPartition(InternalRowPartitionComputer computer) {
+ return new Partition(
+ computer.generatePartValues(partition),
+ recordCount,
+ fileSizeInBytes,
+ fileCount,
+ lastFileCreationTime);
+ }
+
public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
return fromDataFile(entry.partition(), entry.kind(), entry.file());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4a59de8dac..82ce3ad5c8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -36,10 +36,12 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -55,6 +57,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
@@ -78,6 +81,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static java.util.Collections.emptyList;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
@@ -137,6 +141,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final BucketMode bucketMode;
private final long commitTimeout;
private final int commitMaxRetries;
+ private final InternalRowPartitionComputer partitionComputer;
private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
@@ -198,6 +203,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
this.commitCallbacks = commitCallbacks;
this.commitMaxRetries = commitMaxRetries;
this.commitTimeout = commitTimeout;
+ this.partitionComputer =
+ new InternalRowPartitionComputer(
+ options.partitionDefaultName(),
+ partitionType,
+ partitionType.getFieldNames().toArray(new String[0]),
+ options.legacyPartitionName());
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
@@ -492,7 +503,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
attempts +=
tryCommit(
compactTableFiles,
- Collections.emptyList(),
+ emptyList(),
compactDvIndexFiles,
committable.identifier(),
committable.watermark(),
@@ -508,9 +519,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (this.commitMetrics != null) {
reportCommit(
appendTableFiles,
- Collections.emptyList(),
+ emptyList(),
compactTableFiles,
- Collections.emptyList(),
+ emptyList(),
commitDuration,
generatedSnapshot,
attempts);
@@ -550,23 +561,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
tryOverwrite(
- partitionFilter,
- Collections.emptyList(),
- Collections.emptyList(),
- commitIdentifier,
- null,
- new HashMap<>());
+ partitionFilter, emptyList(), emptyList(), commitIdentifier,
null, new HashMap<>());
}
@Override
public void truncateTable(long commitIdentifier) {
- tryOverwrite(
- null,
- Collections.emptyList(),
- Collections.emptyList(),
- commitIdentifier,
- null,
- new HashMap<>());
+ tryOverwrite(null, emptyList(), emptyList(), commitIdentifier, null,
new HashMap<>());
}
@Override
@@ -597,9 +597,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
public void commitStatistics(Statistics stats, long commitIdentifier) {
String statsFileName = statsFileHandler.writeStats(stats);
tryCommit(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ emptyList(),
+ emptyList(),
+ emptyList(),
commitIdentifier,
null,
Collections.emptyMap(),
@@ -809,7 +809,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
return tryCommit(
changesWithOverwrite,
- Collections.emptyList(),
+ emptyList(),
indexChangesWithOverwrite,
identifier,
watermark,
@@ -887,6 +887,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
Snapshot newSnapshot;
String baseManifestList = null;
String deltaManifestList = null;
+ List<PartitionEntry> deltaStatistics = null;
String changelogManifestList = null;
String oldIndexManifest = null;
String indexManifest = null;
@@ -933,6 +934,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
boolean rewriteIndexManifest = true;
if (retryResult != null) {
+ deltaStatistics = retryResult.deltaStatistics;
deltaManifestList = retryResult.deltaManifestList;
changelogManifestList = retryResult.changelogManifestList;
if (Objects.equals(oldIndexManifest,
retryResult.oldIndexManifest)) {
@@ -944,6 +946,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
} else {
// write new delta files into manifest files
+ deltaStatistics = new
ArrayList<>(PartitionEntry.merge(deltaFiles));
deltaManifestList =
manifestList.write(manifestFile.write(deltaFiles));
// write changelog into manifest files
@@ -1009,7 +1012,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
e);
}
- if (commitSnapshotImpl(newSnapshot)) {
+ if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
@@ -1031,6 +1034,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
newSnapshotId, commitUser, identifier,
commitKind.name(), commitTime));
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests,
mergeAfterManifests);
return new RetryResult(
+ deltaStatistics,
deltaManifestList,
changelogManifestList,
oldIndexManifest,
@@ -1115,7 +1119,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
String baseManifestList = manifestList.write(mergeAfterManifests);
- String deltaManifestList = manifestList.write(Collections.emptyList());
+ String deltaManifestList = manifestList.write(emptyList());
// prepare snapshot file
Snapshot newSnapshot =
@@ -1137,7 +1141,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
latestSnapshot.watermark(),
latestSnapshot.statistics());
- if (!commitSnapshotImpl(newSnapshot)) {
+ if (!commitSnapshotImpl(newSnapshot, emptyList())) {
return new ManifestCompactResult(
baseManifestList, deltaManifestList, mergeBeforeManifests,
mergeAfterManifests);
} else {
@@ -1145,9 +1149,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
- private boolean commitSnapshotImpl(Snapshot newSnapshot) {
+ private boolean commitSnapshotImpl(Snapshot newSnapshot,
List<PartitionEntry> deltaStatistics) {
try {
- return snapshotCommit.commit(newSnapshot, branchName);
+ List<Partition> statistics = new
ArrayList<>(deltaStatistics.size());
+ for (PartitionEntry entry : deltaStatistics) {
+ statistics.add(entry.toPartition(partitionComputer));
+ }
+ return snapshotCommit.commit(newSnapshot, branchName, statistics);
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
@@ -1517,6 +1525,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private class RetryResult implements CommitResult {
+ private final List<PartitionEntry> deltaStatistics;
private final String deltaManifestList;
private final String changelogManifestList;
@@ -1527,12 +1536,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private final List<SimpleFileEntry> baseDataFiles;
private RetryResult(
+ List<PartitionEntry> deltaStatistics,
String deltaManifestList,
String changelogManifestList,
String oldIndexManifest,
String newIndexManifest,
Snapshot latestSnapshot,
List<SimpleFileEntry> baseDataFiles) {
+ this.deltaStatistics = deltaStatistics;
this.deltaManifestList = deltaManifestList;
this.changelogManifestList = changelogManifestList;
this.oldIndexManifest = oldIndexManifest;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
index b13082fb44..ef52528daf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
@@ -29,7 +29,10 @@ import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
-/** Entry representing a partition. */
+/**
+ * Statistics of a partition, fields inside may be negative, indicating that
some data has been
+ * removed.
+ */
@JsonIgnoreProperties(ignoreUnknown = true)
@Public
public class Partition implements Serializable {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 341379e523..1ac077cd9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -340,8 +340,9 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots {
return Optional.of(response.getSnapshot());
}
- public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
- CommitTableRequest request = new CommitTableRequest(identifier,
snapshot);
+ public boolean commitSnapshot(
+ Identifier identifier, Snapshot snapshot, List<Partition>
statistics) {
+ CommitTableRequest request = new CommitTableRequest(identifier,
snapshot, statistics);
CommitTableResponse response =
client.post(
resourcePaths.commitTable(identifier.getDatabaseName()),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
index 1a027d4163..87e0506036 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
@@ -21,8 +21,11 @@ package org.apache.paimon.rest;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.utils.SnapshotManager;
+import java.util.List;
+
/** Factory to create {@link SnapshotCommit} for REST Catalog. */
public class RESTSnapshotCommitFactory implements SnapshotCommit.Factory {
@@ -39,11 +42,11 @@ public class RESTSnapshotCommitFactory implements
SnapshotCommit.Factory {
RESTCatalog catalog = loader.load();
return new SnapshotCommit() {
@Override
- public boolean commit(Snapshot snapshot, String branch) {
+ public boolean commit(Snapshot snapshot, String branch,
List<Partition> statistics) {
Identifier newIdentifier =
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(), branch);
- return catalog.commitSnapshot(newIdentifier, snapshot);
+ return catalog.commitSnapshot(newIdentifier, snapshot,
statistics);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
index de8474c1d3..cd7135691b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest.requests;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.RESTRequest;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -27,12 +28,15 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGet
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
/** Request for committing snapshot to table. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommitTableRequest implements RESTRequest {
private static final String FIELD_IDENTIFIER = "identifier";
private static final String FIELD_SNAPSHOT = "snapshot";
+ private static final String FIELD_STATISTICS = "statistics";
@JsonProperty(FIELD_IDENTIFIER)
private final Identifier identifier;
@@ -40,12 +44,17 @@ public class CommitTableRequest implements RESTRequest {
@JsonProperty(FIELD_SNAPSHOT)
private final Snapshot snapshot;
+ @JsonProperty(FIELD_STATISTICS)
+ private final List<Partition> statistics;
+
@JsonCreator
public CommitTableRequest(
@JsonProperty(FIELD_IDENTIFIER) Identifier identifier,
- @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot) {
+ @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot,
+ @JsonProperty(FIELD_STATISTICS) List<Partition> statistics) {
this.identifier = identifier;
this.snapshot = snapshot;
+ this.statistics = statistics;
}
@JsonGetter(FIELD_IDENTIFIER)
@@ -57,4 +66,9 @@ public class CommitTableRequest implements RESTRequest {
public Snapshot getSnapshot() {
return snapshot;
}
+
+ @JsonGetter(FIELD_STATISTICS)
+ public List<Partition> getStatistics() {
+ return statistics;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 04f514ef2e..a7938fd00f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -70,6 +70,7 @@ import okhttp3.mockwebserver.RecordedRequest;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -406,7 +407,8 @@ public class RESTCatalogServer {
if (branchName == null) {
branchName = "main";
}
- boolean success = commit.commit(requestBody.getSnapshot(), branchName);
+ boolean success =
+ commit.commit(requestBody.getSnapshot(), branchName,
Collections.emptyList());
CommitTableResponse response = new CommitTableResponse(success);
return mockResponse(response, 200);
}
diff --git a/paimon-open-api/rest-catalog-open-api.yaml
b/paimon-open-api/rest-catalog-open-api.yaml
index 4b36bcb3fd..ef136c2359 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -1186,6 +1186,10 @@ components:
$ref: '#/components/schemas/Identifier'
snapshot:
$ref: '#/components/schemas/Snapshot'
+ statistics:
+ type: array
+ items:
+ $ref: '#/components/schemas/Partition'
Snapshot:
type: object
properties: