This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ddfe3230fb [core] add audit fields into partitions sys table (#6784)
ddfe3230fb is described below

commit ddfe3230fb8cd8e49f42dd58b37608e2588a31a6
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Dec 16 16:01:29 2025 +0800

    [core] add audit fields into partitions sys table (#6784)
---
 docs/content/concepts/system-tables.md             |  13 +-
 docs/static/rest-catalog-open-api.yaml             |  14 ++
 .../org/apache/paimon/partition/Partition.java     | 119 ++++++++++++++++-
 .../org/apache/paimon/partition/PartitionTest.java |  87 +++++++++++++
 .../org/apache/paimon/catalog/CatalogUtils.java    |  17 ++-
 .../table/source/snapshot/TimeTravelUtil.java      |  13 ++
 .../paimon/table/system/PartitionsTable.java       | 124 ++++++++++++++++--
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  46 +++++--
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  55 ++++++++
 .../paimon/table/system/PartitionsTableTest.java   |  51 +++++++-
 .../table/system/RestPartitionsTableTest.java      | 142 +++++++++++++++++++++
 11 files changed, 642 insertions(+), 39 deletions(-)

diff --git a/docs/content/concepts/system-tables.md 
b/docs/content/concepts/system-tables.md
index d5eab69798..fd412f3fa5 100644
--- a/docs/content/concepts/system-tables.md
+++ b/docs/content/concepts/system-tables.md
@@ -351,14 +351,17 @@ You can query the partition files of the table.
 SELECT * FROM my_table$partitions;
 
 /*
-+---------------+----------------+--------------------+--------------------+------------------------+
-|  partition    |   record_count |  file_size_in_bytes|          file_count|   
     last_update_time|
-+---------------+----------------+--------------------+--------------------+------------------------+
-|  {1}          |           1    |             645    |                1   | 
2024-06-24 10:25:57.400|
-+---------------+----------------+--------------------+--------------------+------------------------+
++-----------+--------------+-------------------+------------+---------------------+---------------------+------------+------------+---------+
+| partition | record_count | file_size_in_bytes| file_count | last_update_time 
   | created_at          | created_by | updated_by | options |
++-----------+--------------+-------------------+------------+---------------------+---------------------+------------+------------+---------+
+| {1}       |            1 |               645 |          1 | 2024-06-24 
10:25:57 | 2024-06-24 10:20:00 | admin      | test_user  | {}      |
++-----------+--------------+-------------------+------------+---------------------+---------------------+------------+------------+---------+
 */
 ```
 
+**Note**: 
+- The `created_by`, `created_at`, `updated_by`, and `options` fields are 
populated from REST catalog audit information. For non-REST catalogs, these 
fields will be `NULL`.
+
 ### Buckets Table
 
 You can query the bucket files of the table.
diff --git a/docs/static/rest-catalog-open-api.yaml 
b/docs/static/rest-catalog-open-api.yaml
index 97f5789ef0..20da3a6366 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -3245,6 +3245,20 @@ components:
           format: int64
         done:
           type: boolean
+        createdAt:
+          type: integer
+          format: int64
+        createdBy:
+          type: string
+        updatedAt:
+          type: integer
+          format: int64
+        updatedBy:
+          type: string
+        options:
+          type: object
+          additionalProperties:
+            type: string
     PartitionStatistics:
       type: object
       properties:
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java 
b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
index 58b5f35881..427b61464f 100644
--- a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
+++ b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
@@ -23,23 +23,57 @@ import org.apache.paimon.annotation.Public;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_CREATED_AT;
+import static 
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_CREATED_BY;
+import static 
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_UPDATED_AT;
+import static 
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_UPDATED_BY;
+
 /** Represent a partition, including statistics and done flag. */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @Public
 public class Partition extends PartitionStatistics {
 
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
 
     public static final String FIELD_DONE = "done";
+    public static final String FIELD_OPTIONS = "options";
 
     @JsonProperty(FIELD_DONE)
     private final boolean done;
 
+    @JsonProperty(FIELD_CREATED_AT)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Long createdAt;
+
+    @JsonProperty(FIELD_CREATED_BY)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final String createdBy;
+
+    @JsonProperty(FIELD_UPDATED_AT)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Long updatedAt;
+
+    @JsonProperty(FIELD_UPDATED_BY)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final String updatedBy;
+
+    @JsonProperty(FIELD_OPTIONS)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Map<String, String> options;
+
     @JsonCreator
     public Partition(
             @JsonProperty(FIELD_SPEC) Map<String, String> spec,
@@ -47,9 +81,40 @@ public class Partition extends PartitionStatistics {
             @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
             @JsonProperty(FIELD_FILE_COUNT) long fileCount,
             @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long 
lastFileCreationTime,
-            @JsonProperty(FIELD_DONE) boolean done) {
+            @JsonProperty(FIELD_DONE) boolean done,
+            @JsonProperty(FIELD_CREATED_AT) @Nullable Long createdAt,
+            @JsonProperty(FIELD_CREATED_BY) @Nullable String createdBy,
+            @JsonProperty(FIELD_UPDATED_AT) @Nullable Long updatedAt,
+            @JsonProperty(FIELD_UPDATED_BY) @Nullable String updatedBy,
+            @JsonProperty(FIELD_OPTIONS) @Nullable Map<String, String> 
options) {
         super(spec, recordCount, fileSizeInBytes, fileCount, 
lastFileCreationTime);
         this.done = done;
+        this.createdAt = createdAt;
+        this.createdBy = createdBy;
+        this.updatedAt = updatedAt;
+        this.updatedBy = updatedBy;
+        this.options = options;
+    }
+
+    public Partition(
+            Map<String, String> spec,
+            long recordCount,
+            long fileSizeInBytes,
+            long fileCount,
+            long lastFileCreationTime,
+            boolean done) {
+        this(
+                spec,
+                recordCount,
+                fileSizeInBytes,
+                fileCount,
+                lastFileCreationTime,
+                done,
+                null,
+                null,
+                null,
+                null,
+                null);
     }
 
     @JsonGetter(FIELD_DONE)
@@ -57,6 +122,36 @@ public class Partition extends PartitionStatistics {
         return done;
     }
 
+    @Nullable
+    @JsonGetter(FIELD_CREATED_AT)
+    public Long createdAt() {
+        return createdAt;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_CREATED_BY)
+    public String createdBy() {
+        return createdBy;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_UPDATED_AT)
+    public Long updatedAt() {
+        return updatedAt;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_UPDATED_BY)
+    public String updatedBy() {
+        return updatedBy;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_OPTIONS)
+    public Map<String, String> options() {
+        return options;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || getClass() != o.getClass()) {
@@ -66,12 +161,18 @@ public class Partition extends PartitionStatistics {
             return false;
         }
         Partition partition = (Partition) o;
-        return done == partition.done;
+        return done == partition.done
+                && Objects.equals(createdAt, partition.createdAt)
+                && Objects.equals(createdBy, partition.createdBy)
+                && Objects.equals(updatedAt, partition.updatedAt)
+                && Objects.equals(updatedBy, partition.updatedBy)
+                && Objects.equals(options, partition.options);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), done);
+        return Objects.hash(
+                super.hashCode(), done, createdAt, createdBy, updatedAt, 
updatedBy, options);
     }
 
     @Override
@@ -89,6 +190,16 @@ public class Partition extends PartitionStatistics {
                 + lastFileCreationTime
                 + ", done="
                 + done
+                + ", createdAt="
+                + createdAt
+                + ", createdBy="
+                + createdBy
+                + ", updatedAt="
+                + updatedAt
+                + ", updatedBy="
+                + updatedBy
+                + ", options="
+                + options
                 + '}';
     }
 }
diff --git 
a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java 
b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
new file mode 100644
index 0000000000..7ec342d38d
--- /dev/null
+++ b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Partition} JSON serialization. */
+class PartitionTest {
+
+    @Test
+    void testJsonSerializationWithNullValues() {
+        Map<String, String> spec = Collections.singletonMap("pt", "1");
+        Partition partition =
+                new Partition(
+                        spec,
+                        100L, // recordCount
+                        1024L, // fileSizeInBytes
+                        2L, // fileCount
+                        System.currentTimeMillis(), // lastFileCreationTime
+                        false, // done
+                        null, // createdAt
+                        null, // createdBy
+                        null, // updatedAt
+                        null, // updatedBy
+                        null); // options
+
+        String json = JsonSerdeUtil.toFlatJson(partition);
+
+        assertThat(json).doesNotContain("createdAt");
+        assertThat(json).doesNotContain("createdBy");
+        assertThat(json).doesNotContain("updatedAt");
+        assertThat(json).doesNotContain("updatedBy");
+        assertThat(json).doesNotContain("options");
+
+        assertThat(json).contains("done");
+        assertThat(json).contains("recordCount");
+    }
+
+    @Test
+    void testJsonSerializationWithNonNullValues() {
+        Map<String, String> spec = Collections.singletonMap("pt", "1");
+        Partition partition =
+                new Partition(
+                        spec,
+                        100L,
+                        1024L,
+                        2L,
+                        System.currentTimeMillis(),
+                        true,
+                        1234567890L, // createdAt
+                        "user1", // createdBy
+                        1234567900L, // updatedAt
+                        "user2", // updatedBy
+                        Collections.singletonMap("key", "value")); // options
+
+        String json = JsonSerdeUtil.toFlatJson(partition);
+
+        assertThat(json).contains("createdAt");
+        assertThat(json).contains("createdBy");
+        assertThat(json).contains("updatedAt");
+        assertThat(json).contains("updatedBy");
+        assertThat(json).contains("options");
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 138769f0c9..f970934348 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -38,6 +38,8 @@ import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.iceberg.IcebergTable;
 import org.apache.paimon.table.lance.LanceTable;
 import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.system.AllPartitionsTable;
 import org.apache.paimon.table.system.AllTableOptionsTable;
 import org.apache.paimon.table.system.AllTablesTable;
@@ -197,8 +199,19 @@ public class CatalogUtils {
                         table.rowType().project(table.partitionKeys()),
                         table.partitionKeys().toArray(new String[0]),
                         options.get(PARTITION_GENERATE_LEGACY_NAME));
-        List<PartitionEntry> partitionEntries =
-                table.newReadBuilder().newScan().listPartitionEntries();
+
+        TableScan scan = table.newReadBuilder().newScan();
+
+        // partitions should be seen even all files are level-0 when enable 
dv, see
+        // https://github.com/apache/paimon/pull/6531 for details
+        List<PartitionEntry> partitionEntries;
+        if (scan instanceof InnerTableScan) {
+            partitionEntries =
+                    ((InnerTableScan) scan).withLevelFilter(level -> 
true).listPartitionEntries();
+        } else {
+            partitionEntries = scan.listPartitionEntries();
+        }
+
         List<Partition> partitions = new ArrayList<>(partitionEntries.size());
         for (PartitionEntry entry : partitionEntries) {
             partitions.add(entry.toPartition(computer));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index bdef244230..8d4087a4e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -133,6 +133,19 @@ public class TimeTravelUtil {
         return Optional.of(snapshot);
     }
 
+    public static boolean hasTimeTravelOptions(Options options) {
+        if (options.containsKey(CoreOptions.SCAN_VERSION.key())) {
+            return true;
+        }
+
+        for (String key : SCAN_KEYS) {
+            if (options.containsKey(key)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static void adaptScanVersion(Options options, TagManager 
tagManager) {
         String version = options.remove(CoreOptions.SCAN_VERSION.key());
         if (version == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index d6a96060ec..d3314edfa3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -18,15 +18,23 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -38,13 +46,16 @@ import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.InternalRowUtils;
 import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.SerializationUtils;
 
@@ -60,6 +71,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -78,7 +90,11 @@ public class PartitionsTable implements ReadonlyTable {
                             new DataField(1, "record_count", new 
BigIntType(false)),
                             new DataField(2, "file_size_in_bytes", new 
BigIntType(false)),
                             new DataField(3, "file_count", new 
BigIntType(false)),
-                            new DataField(4, "last_update_time", 
DataTypes.TIMESTAMP_MILLIS())));
+                            new DataField(4, "last_update_time", 
DataTypes.TIMESTAMP_MILLIS()),
+                            new DataField(5, "created_at", 
DataTypes.TIMESTAMP_MILLIS()),
+                            new DataField(6, "created_by", DataTypes.STRING()),
+                            new DataField(7, "updated_by", DataTypes.STRING()),
+                            new DataField(8, "options", DataTypes.STRING())));
 
     private final FileStoreTable storeTable;
 
@@ -185,8 +201,7 @@ public class PartitionsTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
 
-            List<PartitionEntry> partitions =
-                    fileStoreTable.newScan().withLevelFilter(level -> 
true).listPartitionEntries();
+            List<Partition> partitions = listPartitions();
 
             List<DataType> fieldTypes =
                     
fileStoreTable.schema().logicalPartitionType().getFieldTypes();
@@ -201,9 +216,9 @@ public class PartitionsTable implements ReadonlyTable {
             Iterator<InternalRow> iterator =
                     partitions.stream()
                             .map(
-                                    partitionEntry ->
+                                    partition ->
                                             toRow(
-                                                    partitionEntry,
+                                                    partition,
                                                     
fileStoreTable.partitionKeys(),
                                                     castExecutors,
                                                     fieldGetters,
@@ -225,11 +240,13 @@ public class PartitionsTable implements ReadonlyTable {
         }
 
         private InternalRow toRow(
-                PartitionEntry entry,
+                Partition partition,
                 List<String> partitionKeys,
                 List<CastExecutor> castExecutors,
                 InternalRow.FieldGetter[] fieldGetters,
                 String defaultPartitionName) {
+            PartitionEntry entry = toPartitionEntry(partition);
+
             StringBuilder partitionStringBuilder = new StringBuilder();
 
             for (int i = 0; i < partitionKeys.size(); i++) {
@@ -250,15 +267,96 @@ public class PartitionsTable implements ReadonlyTable {
                         .append(partitionValueString);
             }
 
+            BinaryString createdByString = 
BinaryString.fromString(partition.createdBy());
+            Timestamp createdAtTimestamp = toTimestamp(partition.createdAt());
+            BinaryString updatedByString = 
BinaryString.fromString(partition.updatedBy());
+            BinaryString optionsString = null;
+            if (Objects.nonNull(partition.options())) {
+                optionsString =
+                        
BinaryString.fromString(JsonSerdeUtil.toFlatJson(partition.options()));
+            }
+
             return GenericRow.of(
                     BinaryString.fromString(partitionStringBuilder.toString()),
-                    entry.recordCount(),
-                    entry.fileSizeInBytes(),
-                    entry.fileCount(),
-                    Timestamp.fromLocalDateTime(
-                            LocalDateTime.ofInstant(
-                                    
Instant.ofEpochMilli(entry.lastFileCreationTime()),
-                                    ZoneId.systemDefault())));
+                    partition.recordCount(),
+                    partition.fileSizeInBytes(),
+                    partition.fileCount(),
+                    toTimestamp(partition.lastFileCreationTime()),
+                    createdAtTimestamp,
+                    createdByString,
+                    updatedByString,
+                    optionsString);
+        }
+
+        private PartitionEntry toPartitionEntry(Partition partition) {
+            RowType partitionType = 
fileStoreTable.schema().logicalPartitionType();
+            String defaultPartitionName = 
fileStoreTable.coreOptions().partitionDefaultName();
+            InternalRowSerializer serializer = new 
InternalRowSerializer(partitionType);
+            GenericRow partitionRow =
+                    InternalRowPartitionComputer.convertSpecToInternalRow(
+                            partition.spec(), partitionType, 
defaultPartitionName);
+            BinaryRow binaryPartition = 
serializer.toBinaryRow(partitionRow).copy();
+            return new PartitionEntry(
+                    binaryPartition,
+                    partition.recordCount(),
+                    partition.fileSizeInBytes(),
+                    partition.fileCount(),
+                    partition.lastFileCreationTime());
+        }
+
+        private Timestamp toTimestamp(Long epochMillis) {
+            if (epochMillis == null) {
+                return null;
+            }
+            return Timestamp.fromLocalDateTime(
+                    LocalDateTime.ofInstant(
+                            Instant.ofEpochMilli(epochMillis), 
ZoneId.systemDefault()));
+        }
+
+        private List<Partition> listPartitions() {
+            CatalogLoader catalogLoader = 
fileStoreTable.catalogEnvironment().catalogLoader();
+            if (TimeTravelUtil.hasTimeTravelOptions(new 
Options(fileStoreTable.options()))
+                    || catalogLoader == null) {
+                return listPartitionEntries();
+            }
+
+            try (Catalog catalog = catalogLoader.load()) {
+                Identifier baseIdentifier = 
fileStoreTable.catalogEnvironment().identifier();
+                if (baseIdentifier == null) {
+                    return listPartitionEntries();
+                }
+                String branch = fileStoreTable.coreOptions().branch();
+                Identifier identifier;
+                if (branch != null && 
!branch.equals(CoreOptions.BRANCH.defaultValue())) {
+                    identifier =
+                            new Identifier(
+                                    baseIdentifier.getDatabaseName(),
+                                    baseIdentifier.getTableName(),
+                                    branch);
+                } else {
+                    identifier = baseIdentifier;
+                }
+                return catalog.listPartitions(identifier);
+            } catch (Exception e) {
+                return listPartitionEntries();
+            }
+        }
+
+        private List<Partition> listPartitionEntries() {
+            List<PartitionEntry> partitionEntries =
+                    fileStoreTable.newScan().withLevelFilter(level -> 
true).listPartitionEntries();
+            RowType partitionType = 
fileStoreTable.schema().logicalPartitionType();
+            String defaultPartitionName = 
fileStoreTable.coreOptions().partitionDefaultName();
+            String[] partitionColumns = 
fileStoreTable.partitionKeys().toArray(new String[0]);
+            InternalRowPartitionComputer computer =
+                    new InternalRowPartitionComputer(
+                            defaultPartitionName,
+                            partitionType,
+                            partitionColumns,
+                            
fileStoreTable.coreOptions().legacyPartitionName());
+            return partitionEntries.stream()
+                    .map(entry -> entry.toPartition(computer))
+                    .collect(Collectors.toList());
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index f7ede176c2..f4cad6e47d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -2342,17 +2342,7 @@ public class RESTCatalogServer {
             if (!tablePartitionsStore.containsKey(identifier.getFullName())) {
                 if (statistics != null) {
                     List<Partition> newPartitions =
-                            statistics.stream()
-                                    .map(
-                                            stats ->
-                                                    new Partition(
-                                                            stats.spec(),
-                                                            
stats.recordCount(),
-                                                            
stats.fileSizeInBytes(),
-                                                            stats.fileCount(),
-                                                            
stats.lastFileCreationTime(),
-                                                            false))
-                                    .collect(Collectors.toList());
+                            
statistics.stream().map(this::toPartition).collect(Collectors.toList());
                     tablePartitionsStore.put(identifier.getFullName(), 
newPartitions);
                 }
             } else {
@@ -2377,7 +2367,7 @@ public class RESTCatalogServer {
                                                                 
partitionStatisticsMap.get(
                                                                         
oldPartition.spec());
                                                         if (stats == null) {
-                                                            return 
oldPartition; // 如果没有新的统计信息,保持原样
+                                                            return 
oldPartition;
                                                         }
                                                         return new Partition(
                                                                 
oldPartition.spec(),
@@ -2392,9 +2382,24 @@ public class RESTCatalogServer {
                                                                                
 .lastFileCreationTime(),
                                                                         stats
                                                                                
 .lastFileCreationTime()),
-                                                                
oldPartition.done());
+                                                                
oldPartition.done(),
+                                                                
oldPartition.createdAt(),
+                                                                
oldPartition.createdBy(),
+                                                                
oldPartition.updatedAt(),
+                                                                
oldPartition.updatedBy(),
+                                                                
oldPartition.options());
                                                     })
                                             .collect(Collectors.toList());
+                            Set<Map<String, String>> existingSpecs =
+                                    oldPartitions.stream()
+                                            .map(Partition::spec)
+                                            .collect(Collectors.toSet());
+                            List<Partition> newPartitions =
+                                    statistics.stream()
+                                            .filter(stats -> 
!existingSpecs.contains(stats.spec()))
+                                            .map(this::toPartition)
+                                            .collect(Collectors.toList());
+                            updatedPartitions.addAll(newPartitions);
                             return updatedPartitions;
                         });
             }
@@ -2595,6 +2600,21 @@ public class RESTCatalogServer {
                 400);
     }
 
+    private Partition toPartition(PartitionStatistics stats) {
+        return new Partition(
+                stats.spec(),
+                stats.recordCount(),
+                stats.fileSizeInBytes(),
+                stats.fileCount(),
+                stats.lastFileCreationTime(),
+                false,
+                System.currentTimeMillis(),
+                "created",
+                System.currentTimeMillis(),
+                "updated",
+                new HashMap<>());
+    }
+
     public List<Map<String, String>> getReceivedHeaders() {
         return receivedHeaders;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index c4ac5a6a57..4694c4f9c8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -2625,6 +2625,61 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
                 new 
InternalRowSerializer(partitions.rowType()).toBinaryRow(result.get(0)));
     }
 
+    @Test
+    void testReadPartitionsTable() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"partitions_audit_table");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        catalog.createTable(
+                identifier,
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("f1", DataTypes.INT())
+                        .primaryKey("pk")
+                        .partitionKeys("f1")
+                        .option("bucket", "1")
+                        .option("metastore.partitioned-table", "true")
+                        .build(),
+                true);
+
+        Table table = catalog.getTable(identifier);
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 1));
+            commit.commit(write.prepareCommit());
+        }
+
+        Table partitionsTable =
+                catalog.getTable(
+                        Identifier.create(
+                                identifier.getDatabaseName(),
+                                identifier.getObjectName() + "$partitions"));
+        ReadBuilder readBuilder = partitionsTable.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead read = readBuilder.newRead();
+        List<InternalRow> result = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = read.createReader(splits)) {
+            reader.forEachRemaining(result::add);
+        }
+
+        assertThat(result).isNotEmpty();
+        for (InternalRow row : result) {
+            if (!row.isNullAt(5)) { // created_at
+                assertThat(row.getTimestamp(5, 3)).isNotNull();
+            }
+            assertThat(row.isNullAt(6)).isFalse(); // created_by
+            assertThat(row.getString(6).toString()).isEqualTo("created");
+
+            assertThat(row.isNullAt(7)).isFalse(); // updated_by
+            assertThat(row.getString(7).toString()).isEqualTo("updated");
+
+            if (!row.isNullAt(8)) {
+                String optionsJson = row.getString(8).toString();
+                assertThat(optionsJson).isNotEmpty();
+            }
+        }
+    }
+
     private TestPagedResponse generateTestPagedResponse(
             Map<String, String> queryParams,
             List<Integer> testData,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
index 86ab7cb69f..8091288c60 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
@@ -50,9 +50,9 @@ public class PartitionsTableTest extends TableTestBase {
 
     private static final String tableName = "MyTable";
 
-    private FileStoreTable table;
+    protected FileStoreTable table;
 
-    private PartitionsTable partitionsTable;
+    protected PartitionsTable partitionsTable;
 
     @BeforeEach
     public void before() throws Exception {
@@ -120,4 +120,51 @@ public class PartitionsTableTest extends TableTestBase {
         List<InternalRow> result = read(partitionsTable, new int[] {0, 1, 3});
         assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
     }
+
+    @Test
+    void testPartitionAuditFieldsNull() throws Exception {
+        List<InternalRow> result = read(partitionsTable, new int[] {0, 5, 6, 
7, 8});
+        assertThat(result).hasSize(3);
+
+        for (InternalRow row : result) {
+            assertThat(row.isNullAt(1)).isTrue(); // created_at
+            assertThat(row.isNullAt(2)).isTrue(); //  created_by
+            assertThat(row.isNullAt(3)).isTrue(); // updated_by
+            assertThat(row.isNullAt(4)).isTrue(); // options
+        }
+    }
+
+    @Test
+    void testPartitionWithLegacyPartitionName() throws Exception {
+        String testTableName = "TestLegacyTable";
+        Schema testSchema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+                        .option("bucket", "1")
+                        
.option(CoreOptions.PARTITION_GENERATE_LEGACY_NAME.key(), "false")
+                        .build();
+
+        Identifier testTableId = identifier(testTableName);
+        catalog.createTable(testTableId, testSchema, true);
+        FileStoreTable testTable = (FileStoreTable) 
catalog.getTable(testTableId);
+
+        write(testTable, GenericRow.of(1, 10, 1), GenericRow.of(2, 20, 2));
+
+        Identifier testPartitionsTableId =
+                identifier(testTableName + SYSTEM_TABLE_SPLITTER + 
PartitionsTable.PARTITIONS);
+        PartitionsTable testPartitionsTable =
+                (PartitionsTable) catalog.getTable(testPartitionsTableId);
+
+        List<InternalRow> expectedRow = new ArrayList<>();
+        expectedRow.add(GenericRow.of(BinaryString.fromString("pt=10"), 1L));
+        expectedRow.add(GenericRow.of(BinaryString.fromString("pt=20"), 1L));
+
+        List<InternalRow> result = read(testPartitionsTable, new int[] {0, 1});
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/RestPartitionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/RestPartitionsTableTest.java
new file mode 100644
index 0000000000..3dc54d44e7
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/RestPartitionsTableTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test PartitionsTable with REST catalog. */
+class RestPartitionsTableTest extends PartitionsTableTest {
+
+    private static final String TABLE_NAME = "MyTable";
+
+    private RESTCatalogServer restCatalogServer;
+
+    @BeforeEach
+    @Override
+    public void before() throws Exception {
+        String dataPath = warehouse.toString() + "/RestPartitionsTableTest";
+        String restWarehouse = UUID.randomUUID().toString();
+        String initToken = "init_token";
+        BearTokenAuthProvider authProvider = new 
BearTokenAuthProvider(initToken);
+
+        Map<String, String> defaultConf =
+                new HashMap<>(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                CatalogOptions.WAREHOUSE.key(),
+                                restWarehouse));
+        ConfigResponse config = new ConfigResponse(defaultConf, 
ImmutableMap.of());
+        restCatalogServer = new RESTCatalogServer(dataPath, authProvider, 
config, restWarehouse);
+        restCatalogServer.start();
+
+        Options options = new Options();
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        options.set(RESTCatalogOptions.TOKEN, initToken);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
+        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+        options.set(CatalogOptions.METASTORE.key(), "rest");
+        options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, dataPath);
+
+        catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
+
+        catalog.createDatabase(database, true);
+
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+                        .option("bucket", "1")
+                        .option("metastore.partitioned-table", "true")
+                        .build();
+
+        Identifier tableId = identifier(TABLE_NAME);
+        catalog.createTable(tableId, schema, true);
+        table = (FileStoreTable) catalog.getTable(tableId);
+
+        Identifier filesTableId =
+                identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER + 
PartitionsTable.PARTITIONS);
+        partitionsTable = (PartitionsTable) catalog.getTable(filesTableId);
+
+        write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 3, 5));
+
+        write(table, GenericRow.of(1, 1, 3), GenericRow.of(1, 2, 4));
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (restCatalogServer != null) {
+            restCatalogServer.shutdown();
+        }
+    }
+
+    @Test
+    @Override
+    void testPartitionAuditFieldsNull() throws Exception {
+        List<InternalRow> result = read(partitionsTable, new int[] {0, 5, 6, 
7, 8});
+        assertThat(result).isNotEmpty();
+
+        for (InternalRow row : result) {
+            assertThat(row.isNullAt(1)).isFalse(); // created_at
+            assertThat(row.isNullAt(2)).isFalse(); // created_by
+            assertThat(row.getString(2).toString()).isEqualTo("created");
+            assertThat(row.isNullAt(3)).isFalse(); // updated_by
+            assertThat(row.getString(3).toString()).isEqualTo("updated");
+            if (!row.isNullAt(4)) { // options
+                String optionsJson = row.getString(4).toString();
+                assertThat(optionsJson).isNotEmpty();
+            }
+        }
+    }
+}


Reply via email to