This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 24d690a6c3 [test] refactor commitSpanshot in RestCatalog and fix ut 
(#5302)
24d690a6c3 is described below

commit 24d690a6c363cb84794104c1346d4081259918a5
Author: Xiaohu <[email protected]>
AuthorDate: Tue Mar 18 13:56:27 2025 +0800

    [test] refactor commitSpanshot in RestCatalog and fix ut (#5302)
---
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 102 +++++++++++++++++----
 .../apache/paimon/rest/RESTCatalogTestBase.java    |  17 +++-
 2 files changed, 98 insertions(+), 21 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index e285a5facd..9654f2647c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -37,6 +37,7 @@ import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
+import org.apache.paimon.partition.PartitionStatistics;
 import org.apache.paimon.rest.auth.AuthProvider;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
@@ -562,16 +563,8 @@ public class RESTCatalogServer {
         if (!tableMetadataStore.containsKey(identifier.getFullName())) {
             throw new Catalog.TableNotExistException(identifier);
         }
-        FileStoreTable table = getFileTable(identifier);
-        RenamingSnapshotCommit commit =
-                new RenamingSnapshotCommit(table.snapshotManager(), 
Lock.empty());
-        String branchName = identifier.getBranchName();
-        if (branchName == null) {
-            branchName = "main";
-        }
         boolean success =
-                commit.commit(requestBody.getSnapshot(), branchName, 
Collections.emptyList());
-        commitSnapshot(identifier, requestBody.getSnapshot(), null);
+                commitSnapshot(identifier, requestBody.getSnapshot(), 
requestBody.getStatistics());
         CommitTableResponse response = new CommitTableResponse(success);
         return mockResponse(response, 200);
     }
@@ -1262,7 +1255,7 @@ public class RESTCatalogServer {
     }
 
     private boolean commitSnapshot(
-            Identifier identifier, Snapshot snapshot, List<Partition> 
statistics)
+            Identifier identifier, Snapshot snapshot, 
List<PartitionStatistics> statistics)
             throws Catalog.TableNotExistException {
         FileStoreTable table = getFileTable(identifier);
         RenamingSnapshotCommit commit =
@@ -1273,6 +1266,7 @@ public class RESTCatalogServer {
         }
         try {
             boolean success = commit.commit(snapshot, branchName, 
Collections.emptyList());
+            // update snapshot and stats
             tableSnapshotStore.compute(
                     identifier.getFullName(),
                     (k, old) -> {
@@ -1281,12 +1275,12 @@ public class RESTCatalogServer {
                         long fileCount = 0;
                         long lastFileCreationTime = 0;
                         if (statistics != null) {
-                            for (Partition partition : statistics) {
-                                recordCount += partition.recordCount();
-                                fileSizeInBytes += partition.fileSizeInBytes();
-                                fileCount += partition.fileCount();
-                                if (partition.lastFileCreationTime() > 
lastFileCreationTime) {
-                                    lastFileCreationTime = 
partition.lastFileCreationTime();
+                            for (PartitionStatistics stats : statistics) {
+                                recordCount += stats.recordCount();
+                                fileSizeInBytes += stats.fileSizeInBytes();
+                                fileCount += stats.fileCount();
+                                if (stats.lastFileCreationTime() > 
lastFileCreationTime) {
+                                    lastFileCreationTime = 
stats.lastFileCreationTime();
                                 }
                             }
                         }
@@ -1305,6 +1299,82 @@ public class RESTCatalogServer {
                                 lastFileCreationTime,
                                 fileSizeInBytes);
                     });
+            // upsert partitions stats
+            if (!tablePartitionsStore.containsKey(identifier.getFullName())) {
+                if (statistics != null) {
+                    List<Partition> newPartitions =
+                            statistics.stream()
+                                    .map(
+                                            stats ->
+                                                    new Partition(
+                                                            stats.spec(),
+                                                            
stats.recordCount(),
+                                                            
stats.fileSizeInBytes(),
+                                                            stats.fileCount(),
+                                                            
stats.lastFileCreationTime(),
+                                                            false))
+                                    .collect(Collectors.toList());
+                    tablePartitionsStore.put(identifier.getFullName(), 
newPartitions);
+                }
+            } else {
+                tablePartitionsStore.compute(
+                        identifier.getFullName(),
+                        (k, oldPartitions) -> {
+                            if (oldPartitions == null || statistics == null) {
+                                return oldPartitions;
+                            }
+                            Map<Map<String, String>, PartitionStatistics> 
partitionStatisticsMap =
+                                    statistics.stream()
+                                            .collect(
+                                                    Collectors.toMap(
+                                                            
PartitionStatistics::spec,
+                                                            y -> y,
+                                                            (a, b) -> a));
+                            List<Partition> updatedPartitions =
+                                    oldPartitions.stream()
+                                            .map(
+                                                    oldPartition -> {
+                                                        PartitionStatistics 
stats =
+                                                                
partitionStatisticsMap.get(
+                                                                        
oldPartition.spec());
+                                                        if (stats == null) {
+                                                            return 
oldPartition; // 如果没有新的统计信息,保持原样
+                                                        }
+                                                        return new Partition(
+                                                                
oldPartition.spec(),
+                                                                
oldPartition.recordCount()
+                                                                        + 
stats.recordCount(),
+                                                                
oldPartition.fileSizeInBytes()
+                                                                        + 
stats.fileSizeInBytes(),
+                                                                
oldPartition.fileCount()
+                                                                        + 
stats.fileCount(),
+                                                                Math.max(
+                                                                        
oldPartition
+                                                                               
 .lastFileCreationTime(),
+                                                                        stats
+                                                                               
 .lastFileCreationTime()),
+                                                                
oldPartition.done());
+                                                    })
+                                            .collect(Collectors.toList());
+                            return updatedPartitions;
+                        });
+            }
+            // clean up partitions
+            tablePartitionsStore
+                    .entrySet()
+                    .removeIf(
+                            entry -> {
+                                List<Partition> partitions = entry.getValue();
+                                if (partitions == null) {
+                                    return true;
+                                }
+                                partitions.removeIf(
+                                        partition ->
+                                                partition.fileSizeInBytes() <= 0
+                                                        && 
partition.fileCount() <= 0
+                                                        && 
partition.recordCount() <= 0);
+                                return partitions.isEmpty();
+                            });
             return success;
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
index e7d08e2c0b..1a6b80ffe3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
@@ -597,11 +597,18 @@ public abstract class RESTCatalogTestBase extends 
CatalogTestBase {
         Identifier branchIdentifier = new Identifier("test_db", "test_table", 
branchName);
         assertThrows(
                 Catalog.TableNotExistException.class, () -> 
restCatalog.listPartitions(identifier));
-
-        createTable(
+        restCatalog.createDatabase(identifier.getDatabaseName(), true);
+        restCatalog.createTable(
                 identifier,
-                ImmutableMap.of(METASTORE_PARTITIONED_TABLE.key(), "" + true),
-                Lists.newArrayList("col1"));
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "col1", DataTypes.INT()),
+                                new DataField(1, "dt", DataTypes.STRING())),
+                        Arrays.asList("dt"),
+                        Collections.emptyList(),
+                        ImmutableMap.of(METASTORE_PARTITIONED_TABLE.key(), "" 
+ true),
+                        ""),
+                true);
         List<Partition> result = catalog.listPartitions(identifier);
         assertEquals(0, result.size());
         List<Map<String, String>> partitionSpecs =
@@ -1007,7 +1014,7 @@ public abstract class RESTCatalogTestBase extends 
CatalogTestBase {
     @Override
     protected boolean supportPartitions() {
         // TODO support this
-        return false;
+        return true;
     }
 
     @Override

Reply via email to