This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ddfe3230fb [core] add audit fields into partitions sys table (#6784)
ddfe3230fb is described below
commit ddfe3230fb8cd8e49f42dd58b37608e2588a31a6
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Dec 16 16:01:29 2025 +0800
[core] add audit fields into partitions sys table (#6784)
---
docs/content/concepts/system-tables.md | 13 +-
docs/static/rest-catalog-open-api.yaml | 14 ++
.../org/apache/paimon/partition/Partition.java | 119 ++++++++++++++++-
.../org/apache/paimon/partition/PartitionTest.java | 87 +++++++++++++
.../org/apache/paimon/catalog/CatalogUtils.java | 17 ++-
.../table/source/snapshot/TimeTravelUtil.java | 13 ++
.../paimon/table/system/PartitionsTable.java | 124 ++++++++++++++++--
.../org/apache/paimon/rest/RESTCatalogServer.java | 46 +++++--
.../org/apache/paimon/rest/RESTCatalogTest.java | 55 ++++++++
.../paimon/table/system/PartitionsTableTest.java | 51 +++++++-
.../table/system/RestPartitionsTableTest.java | 142 +++++++++++++++++++++
11 files changed, 642 insertions(+), 39 deletions(-)
diff --git a/docs/content/concepts/system-tables.md
b/docs/content/concepts/system-tables.md
index d5eab69798..fd412f3fa5 100644
--- a/docs/content/concepts/system-tables.md
+++ b/docs/content/concepts/system-tables.md
@@ -351,14 +351,17 @@ You can query the partition files of the table.
SELECT * FROM my_table$partitions;
/*
-+---------------+----------------+--------------------+--------------------+------------------------+
-| partition | record_count | file_size_in_bytes| file_count|
last_update_time|
-+---------------+----------------+--------------------+--------------------+------------------------+
-| {1} | 1 | 645 | 1 |
2024-06-24 10:25:57.400|
-+---------------+----------------+--------------------+--------------------+------------------------+
++-----------+--------------+-------------------+------------+---------------------+---------------------+------------+------------+---------+
+| partition | record_count | file_size_in_bytes| file_count | last_update_time
| created_at | created_by | updated_by | options |
++-----------+--------------+-------------------+------------+---------------------+---------------------+------------+------------+---------+
+| {1} | 1 | 645 | 1 | 2024-06-24
10:25:57 | 2024-06-24 10:20:00 | admin | test_user | {} |
++-----------+--------------+-------------------+------------+---------------------+---------------------+------------+------------+---------+
*/
```
+**Note**:
+- The `created_by`, `created_at`, `updated_by`, and `options` fields are
populated from REST catalog audit information. For non-REST catalogs, these
fields will be `NULL`.
+
### Buckets Table
You can query the bucket files of the table.
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index 97f5789ef0..20da3a6366 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -3245,6 +3245,20 @@ components:
format: int64
done:
type: boolean
+ createdAt:
+ type: integer
+ format: int64
+ createdBy:
+ type: string
+ updatedAt:
+ type: integer
+ format: int64
+ updatedBy:
+ type: string
+ options:
+ type: object
+ additionalProperties:
+ type: string
PartitionStatistics:
type: object
properties:
diff --git
a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
index 58b5f35881..427b61464f 100644
--- a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
+++ b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
@@ -23,23 +23,57 @@ import org.apache.paimon.annotation.Public;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
import java.util.Map;
import java.util.Objects;
+import static
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_CREATED_AT;
+import static
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_CREATED_BY;
+import static
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_UPDATED_AT;
+import static
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_UPDATED_BY;
+
/** Represent a partition, including statistics and done flag. */
@JsonIgnoreProperties(ignoreUnknown = true)
@Public
public class Partition extends PartitionStatistics {
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
public static final String FIELD_DONE = "done";
+ public static final String FIELD_OPTIONS = "options";
@JsonProperty(FIELD_DONE)
private final boolean done;
+ @JsonProperty(FIELD_CREATED_AT)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Long createdAt;
+
+ @JsonProperty(FIELD_CREATED_BY)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final String createdBy;
+
+ @JsonProperty(FIELD_UPDATED_AT)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Long updatedAt;
+
+ @JsonProperty(FIELD_UPDATED_BY)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final String updatedBy;
+
+ @JsonProperty(FIELD_OPTIONS)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Map<String, String> options;
+
@JsonCreator
public Partition(
@JsonProperty(FIELD_SPEC) Map<String, String> spec,
@@ -47,9 +81,40 @@ public class Partition extends PartitionStatistics {
@JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
@JsonProperty(FIELD_FILE_COUNT) long fileCount,
@JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long
lastFileCreationTime,
- @JsonProperty(FIELD_DONE) boolean done) {
+ @JsonProperty(FIELD_DONE) boolean done,
+ @JsonProperty(FIELD_CREATED_AT) @Nullable Long createdAt,
+ @JsonProperty(FIELD_CREATED_BY) @Nullable String createdBy,
+ @JsonProperty(FIELD_UPDATED_AT) @Nullable Long updatedAt,
+ @JsonProperty(FIELD_UPDATED_BY) @Nullable String updatedBy,
+ @JsonProperty(FIELD_OPTIONS) @Nullable Map<String, String>
options) {
super(spec, recordCount, fileSizeInBytes, fileCount,
lastFileCreationTime);
this.done = done;
+ this.createdAt = createdAt;
+ this.createdBy = createdBy;
+ this.updatedAt = updatedAt;
+ this.updatedBy = updatedBy;
+ this.options = options;
+ }
+
+ public Partition(
+ Map<String, String> spec,
+ long recordCount,
+ long fileSizeInBytes,
+ long fileCount,
+ long lastFileCreationTime,
+ boolean done) {
+ this(
+ spec,
+ recordCount,
+ fileSizeInBytes,
+ fileCount,
+ lastFileCreationTime,
+ done,
+ null,
+ null,
+ null,
+ null,
+ null);
}
@JsonGetter(FIELD_DONE)
@@ -57,6 +122,36 @@ public class Partition extends PartitionStatistics {
return done;
}
+ @Nullable
+ @JsonGetter(FIELD_CREATED_AT)
+ public Long createdAt() {
+ return createdAt;
+ }
+
+ @Nullable
+ @JsonGetter(FIELD_CREATED_BY)
+ public String createdBy() {
+ return createdBy;
+ }
+
+ @Nullable
+ @JsonGetter(FIELD_UPDATED_AT)
+ public Long updatedAt() {
+ return updatedAt;
+ }
+
+ @Nullable
+ @JsonGetter(FIELD_UPDATED_BY)
+ public String updatedBy() {
+ return updatedBy;
+ }
+
+ @Nullable
+ @JsonGetter(FIELD_OPTIONS)
+ public Map<String, String> options() {
+ return options;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
@@ -66,12 +161,18 @@ public class Partition extends PartitionStatistics {
return false;
}
Partition partition = (Partition) o;
- return done == partition.done;
+ return done == partition.done
+ && Objects.equals(createdAt, partition.createdAt)
+ && Objects.equals(createdBy, partition.createdBy)
+ && Objects.equals(updatedAt, partition.updatedAt)
+ && Objects.equals(updatedBy, partition.updatedBy)
+ && Objects.equals(options, partition.options);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), done);
+ return Objects.hash(
+ super.hashCode(), done, createdAt, createdBy, updatedAt,
updatedBy, options);
}
@Override
@@ -89,6 +190,16 @@ public class Partition extends PartitionStatistics {
+ lastFileCreationTime
+ ", done="
+ done
+ + ", createdAt="
+ + createdAt
+ + ", createdBy="
+ + createdBy
+ + ", updatedAt="
+ + updatedAt
+ + ", updatedBy="
+ + updatedBy
+ + ", options="
+ + options
+ '}';
}
}
diff --git
a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
new file mode 100644
index 0000000000..7ec342d38d
--- /dev/null
+++ b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Partition} JSON serialization. */
+class PartitionTest {
+
+ @Test
+ void testJsonSerializationWithNullValues() {
+ Map<String, String> spec = Collections.singletonMap("pt", "1");
+ Partition partition =
+ new Partition(
+ spec,
+ 100L, // recordCount
+ 1024L, // fileSizeInBytes
+ 2L, // fileCount
+ System.currentTimeMillis(), // lastFileCreationTime
+ false, // done
+ null, // createdAt
+ null, // createdBy
+ null, // updatedAt
+ null, // updatedBy
+ null); // options
+
+ String json = JsonSerdeUtil.toFlatJson(partition);
+
+ assertThat(json).doesNotContain("createdAt");
+ assertThat(json).doesNotContain("createdBy");
+ assertThat(json).doesNotContain("updatedAt");
+ assertThat(json).doesNotContain("updatedBy");
+ assertThat(json).doesNotContain("options");
+
+ assertThat(json).contains("done");
+ assertThat(json).contains("recordCount");
+ }
+
+ @Test
+ void testJsonSerializationWithNonNullValues() {
+ Map<String, String> spec = Collections.singletonMap("pt", "1");
+ Partition partition =
+ new Partition(
+ spec,
+ 100L,
+ 1024L,
+ 2L,
+ System.currentTimeMillis(),
+ true,
+ 1234567890L, // createdAt
+ "user1", // createdBy
+ 1234567900L, // updatedAt
+ "user2", // updatedBy
+ Collections.singletonMap("key", "value")); // options
+
+ String json = JsonSerdeUtil.toFlatJson(partition);
+
+ assertThat(json).contains("createdAt");
+ assertThat(json).contains("createdBy");
+ assertThat(json).contains("updatedAt");
+ assertThat(json).contains("updatedBy");
+ assertThat(json).contains("options");
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 138769f0c9..f970934348 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -38,6 +38,8 @@ import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.table.iceberg.IcebergTable;
import org.apache.paimon.table.lance.LanceTable;
import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.AllPartitionsTable;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.AllTablesTable;
@@ -197,8 +199,19 @@ public class CatalogUtils {
table.rowType().project(table.partitionKeys()),
table.partitionKeys().toArray(new String[0]),
options.get(PARTITION_GENERATE_LEGACY_NAME));
- List<PartitionEntry> partitionEntries =
- table.newReadBuilder().newScan().listPartitionEntries();
+
+ TableScan scan = table.newReadBuilder().newScan();
+
+ // partitions should be seen even all files are level-0 when enable
dv, see
+ // https://github.com/apache/paimon/pull/6531 for details
+ List<PartitionEntry> partitionEntries;
+ if (scan instanceof InnerTableScan) {
+ partitionEntries =
+ ((InnerTableScan) scan).withLevelFilter(level ->
true).listPartitionEntries();
+ } else {
+ partitionEntries = scan.listPartitionEntries();
+ }
+
List<Partition> partitions = new ArrayList<>(partitionEntries.size());
for (PartitionEntry entry : partitionEntries) {
partitions.add(entry.toPartition(computer));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index bdef244230..8d4087a4e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -133,6 +133,19 @@ public class TimeTravelUtil {
return Optional.of(snapshot);
}
+ public static boolean hasTimeTravelOptions(Options options) {
+ if (options.containsKey(CoreOptions.SCAN_VERSION.key())) {
+ return true;
+ }
+
+ for (String key : SCAN_KEYS) {
+ if (options.containsKey(key)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static void adaptScanVersion(Options options, TagManager
tagManager) {
String version = options.remove(CoreOptions.SCAN_VERSION.key());
if (version == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index d6a96060ec..d3314edfa3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -18,15 +18,23 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -38,13 +46,16 @@ import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;
@@ -60,6 +71,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -78,7 +90,11 @@ public class PartitionsTable implements ReadonlyTable {
new DataField(1, "record_count", new
BigIntType(false)),
new DataField(2, "file_size_in_bytes", new
BigIntType(false)),
new DataField(3, "file_count", new
BigIntType(false)),
- new DataField(4, "last_update_time",
DataTypes.TIMESTAMP_MILLIS())));
+ new DataField(4, "last_update_time",
DataTypes.TIMESTAMP_MILLIS()),
+ new DataField(5, "created_at",
DataTypes.TIMESTAMP_MILLIS()),
+ new DataField(6, "created_by", DataTypes.STRING()),
+ new DataField(7, "updated_by", DataTypes.STRING()),
+ new DataField(8, "options", DataTypes.STRING())));
private final FileStoreTable storeTable;
@@ -185,8 +201,7 @@ public class PartitionsTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- List<PartitionEntry> partitions =
- fileStoreTable.newScan().withLevelFilter(level ->
true).listPartitionEntries();
+ List<Partition> partitions = listPartitions();
List<DataType> fieldTypes =
fileStoreTable.schema().logicalPartitionType().getFieldTypes();
@@ -201,9 +216,9 @@ public class PartitionsTable implements ReadonlyTable {
Iterator<InternalRow> iterator =
partitions.stream()
.map(
- partitionEntry ->
+ partition ->
toRow(
- partitionEntry,
+ partition,
fileStoreTable.partitionKeys(),
castExecutors,
fieldGetters,
@@ -225,11 +240,13 @@ public class PartitionsTable implements ReadonlyTable {
}
private InternalRow toRow(
- PartitionEntry entry,
+ Partition partition,
List<String> partitionKeys,
List<CastExecutor> castExecutors,
InternalRow.FieldGetter[] fieldGetters,
String defaultPartitionName) {
+ PartitionEntry entry = toPartitionEntry(partition);
+
StringBuilder partitionStringBuilder = new StringBuilder();
for (int i = 0; i < partitionKeys.size(); i++) {
@@ -250,15 +267,96 @@ public class PartitionsTable implements ReadonlyTable {
.append(partitionValueString);
}
+ BinaryString createdByString =
BinaryString.fromString(partition.createdBy());
+ Timestamp createdAtTimestamp = toTimestamp(partition.createdAt());
+ BinaryString updatedByString =
BinaryString.fromString(partition.updatedBy());
+ BinaryString optionsString = null;
+ if (Objects.nonNull(partition.options())) {
+ optionsString =
+
BinaryString.fromString(JsonSerdeUtil.toFlatJson(partition.options()));
+ }
+
return GenericRow.of(
BinaryString.fromString(partitionStringBuilder.toString()),
- entry.recordCount(),
- entry.fileSizeInBytes(),
- entry.fileCount(),
- Timestamp.fromLocalDateTime(
- LocalDateTime.ofInstant(
-
Instant.ofEpochMilli(entry.lastFileCreationTime()),
- ZoneId.systemDefault())));
+ partition.recordCount(),
+ partition.fileSizeInBytes(),
+ partition.fileCount(),
+ toTimestamp(partition.lastFileCreationTime()),
+ createdAtTimestamp,
+ createdByString,
+ updatedByString,
+ optionsString);
+ }
+
+ private PartitionEntry toPartitionEntry(Partition partition) {
+ RowType partitionType =
fileStoreTable.schema().logicalPartitionType();
+ String defaultPartitionName =
fileStoreTable.coreOptions().partitionDefaultName();
+ InternalRowSerializer serializer = new
InternalRowSerializer(partitionType);
+ GenericRow partitionRow =
+ InternalRowPartitionComputer.convertSpecToInternalRow(
+ partition.spec(), partitionType,
defaultPartitionName);
+ BinaryRow binaryPartition =
serializer.toBinaryRow(partitionRow).copy();
+ return new PartitionEntry(
+ binaryPartition,
+ partition.recordCount(),
+ partition.fileSizeInBytes(),
+ partition.fileCount(),
+ partition.lastFileCreationTime());
+ }
+
+ private Timestamp toTimestamp(Long epochMillis) {
+ if (epochMillis == null) {
+ return null;
+ }
+ return Timestamp.fromLocalDateTime(
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(epochMillis),
ZoneId.systemDefault()));
+ }
+
+ private List<Partition> listPartitions() {
+ CatalogLoader catalogLoader =
fileStoreTable.catalogEnvironment().catalogLoader();
+ if (TimeTravelUtil.hasTimeTravelOptions(new
Options(fileStoreTable.options()))
+ || catalogLoader == null) {
+ return listPartitionEntries();
+ }
+
+ try (Catalog catalog = catalogLoader.load()) {
+ Identifier baseIdentifier =
fileStoreTable.catalogEnvironment().identifier();
+ if (baseIdentifier == null) {
+ return listPartitionEntries();
+ }
+ String branch = fileStoreTable.coreOptions().branch();
+ Identifier identifier;
+ if (branch != null &&
!branch.equals(CoreOptions.BRANCH.defaultValue())) {
+ identifier =
+ new Identifier(
+ baseIdentifier.getDatabaseName(),
+ baseIdentifier.getTableName(),
+ branch);
+ } else {
+ identifier = baseIdentifier;
+ }
+ return catalog.listPartitions(identifier);
+ } catch (Exception e) {
+ return listPartitionEntries();
+ }
+ }
+
+ private List<Partition> listPartitionEntries() {
+ List<PartitionEntry> partitionEntries =
+ fileStoreTable.newScan().withLevelFilter(level ->
true).listPartitionEntries();
+ RowType partitionType =
fileStoreTable.schema().logicalPartitionType();
+ String defaultPartitionName =
fileStoreTable.coreOptions().partitionDefaultName();
+ String[] partitionColumns =
fileStoreTable.partitionKeys().toArray(new String[0]);
+ InternalRowPartitionComputer computer =
+ new InternalRowPartitionComputer(
+ defaultPartitionName,
+ partitionType,
+ partitionColumns,
+
fileStoreTable.coreOptions().legacyPartitionName());
+ return partitionEntries.stream()
+ .map(entry -> entry.toPartition(computer))
+ .collect(Collectors.toList());
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index f7ede176c2..f4cad6e47d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -2342,17 +2342,7 @@ public class RESTCatalogServer {
if (!tablePartitionsStore.containsKey(identifier.getFullName())) {
if (statistics != null) {
List<Partition> newPartitions =
- statistics.stream()
- .map(
- stats ->
- new Partition(
- stats.spec(),
-
stats.recordCount(),
-
stats.fileSizeInBytes(),
- stats.fileCount(),
-
stats.lastFileCreationTime(),
- false))
- .collect(Collectors.toList());
+
statistics.stream().map(this::toPartition).collect(Collectors.toList());
tablePartitionsStore.put(identifier.getFullName(),
newPartitions);
}
} else {
@@ -2377,7 +2367,7 @@ public class RESTCatalogServer {
partitionStatisticsMap.get(
oldPartition.spec());
if (stats == null) {
- return
oldPartition; // 如果没有新的统计信息,保持原样
+ return
oldPartition;
}
return new Partition(
oldPartition.spec(),
@@ -2392,9 +2382,24 @@ public class RESTCatalogServer {
.lastFileCreationTime(),
stats
.lastFileCreationTime()),
-
oldPartition.done());
+
oldPartition.done(),
+
oldPartition.createdAt(),
+
oldPartition.createdBy(),
+
oldPartition.updatedAt(),
+
oldPartition.updatedBy(),
+
oldPartition.options());
})
.collect(Collectors.toList());
+ Set<Map<String, String>> existingSpecs =
+ oldPartitions.stream()
+ .map(Partition::spec)
+ .collect(Collectors.toSet());
+ List<Partition> newPartitions =
+ statistics.stream()
+ .filter(stats ->
!existingSpecs.contains(stats.spec()))
+ .map(this::toPartition)
+ .collect(Collectors.toList());
+ updatedPartitions.addAll(newPartitions);
return updatedPartitions;
});
}
@@ -2595,6 +2600,21 @@ public class RESTCatalogServer {
400);
}
+ private Partition toPartition(PartitionStatistics stats) {
+ return new Partition(
+ stats.spec(),
+ stats.recordCount(),
+ stats.fileSizeInBytes(),
+ stats.fileCount(),
+ stats.lastFileCreationTime(),
+ false,
+ System.currentTimeMillis(),
+ "created",
+ System.currentTimeMillis(),
+ "updated",
+ new HashMap<>());
+ }
+
public List<Map<String, String>> getReceivedHeaders() {
return receivedHeaders;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index c4ac5a6a57..4694c4f9c8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -2625,6 +2625,61 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
new
InternalRowSerializer(partitions.rowType()).toBinaryRow(result.get(0)));
}
+ @Test
+ void testReadPartitionsTable() throws Exception {
+ Identifier identifier = Identifier.create("test_table_db",
"partitions_audit_table");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ catalog.createTable(
+ identifier,
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("f1", DataTypes.INT())
+ .primaryKey("pk")
+ .partitionKeys("f1")
+ .option("bucket", "1")
+ .option("metastore.partitioned-table", "true")
+ .build(),
+ true);
+
+ Table table = catalog.getTable(identifier);
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 1));
+ commit.commit(write.prepareCommit());
+ }
+
+ Table partitionsTable =
+ catalog.getTable(
+ Identifier.create(
+ identifier.getDatabaseName(),
+ identifier.getObjectName() + "$partitions"));
+ ReadBuilder readBuilder = partitionsTable.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ TableRead read = readBuilder.newRead();
+ List<InternalRow> result = new ArrayList<>();
+ try (RecordReader<InternalRow> reader = read.createReader(splits)) {
+ reader.forEachRemaining(result::add);
+ }
+
+ assertThat(result).isNotEmpty();
+ for (InternalRow row : result) {
+ if (!row.isNullAt(5)) { // created_at
+ assertThat(row.getTimestamp(5, 3)).isNotNull();
+ }
+ assertThat(row.isNullAt(6)).isFalse(); // created_by
+ assertThat(row.getString(6).toString()).isEqualTo("created");
+
+ assertThat(row.isNullAt(7)).isFalse(); // updated_by
+ assertThat(row.getString(7).toString()).isEqualTo("updated");
+
+ if (!row.isNullAt(8)) {
+ String optionsJson = row.getString(8).toString();
+ assertThat(optionsJson).isNotEmpty();
+ }
+ }
+ }
+
private TestPagedResponse generateTestPagedResponse(
Map<String, String> queryParams,
List<Integer> testData,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
index 86ab7cb69f..8091288c60 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
@@ -50,9 +50,9 @@ public class PartitionsTableTest extends TableTestBase {
private static final String tableName = "MyTable";
- private FileStoreTable table;
+ protected FileStoreTable table;
- private PartitionsTable partitionsTable;
+ protected PartitionsTable partitionsTable;
@BeforeEach
public void before() throws Exception {
@@ -120,4 +120,51 @@ public class PartitionsTableTest extends TableTestBase {
List<InternalRow> result = read(partitionsTable, new int[] {0, 1, 3});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
+
+ @Test
+ void testPartitionAuditFieldsNull() throws Exception {
+ List<InternalRow> result = read(partitionsTable, new int[] {0, 5, 6,
7, 8});
+ assertThat(result).hasSize(3);
+
+ for (InternalRow row : result) {
+ assertThat(row.isNullAt(1)).isTrue(); // created_at
+ assertThat(row.isNullAt(2)).isTrue(); // created_by
+ assertThat(row.isNullAt(3)).isTrue(); // updated_by
+ assertThat(row.isNullAt(4)).isTrue(); // options
+ }
+ }
+
+ @Test
+ void testPartitionWithLegacyPartitionName() throws Exception {
+ String testTableName = "TestLegacyTable";
+ Schema testSchema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option("bucket", "1")
+
.option(CoreOptions.PARTITION_GENERATE_LEGACY_NAME.key(), "false")
+ .build();
+
+ Identifier testTableId = identifier(testTableName);
+ catalog.createTable(testTableId, testSchema, true);
+ FileStoreTable testTable = (FileStoreTable)
catalog.getTable(testTableId);
+
+ write(testTable, GenericRow.of(1, 10, 1), GenericRow.of(2, 20, 2));
+
+ Identifier testPartitionsTableId =
+ identifier(testTableName + SYSTEM_TABLE_SPLITTER +
PartitionsTable.PARTITIONS);
+ PartitionsTable testPartitionsTable =
+ (PartitionsTable) catalog.getTable(testPartitionsTableId);
+
+ List<InternalRow> expectedRow = new ArrayList<>();
+ expectedRow.add(GenericRow.of(BinaryString.fromString("pt=10"), 1L));
+ expectedRow.add(GenericRow.of(BinaryString.fromString("pt=20"), 1L));
+
+ List<InternalRow> result = read(testPartitionsTable, new int[] {0, 1});
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/RestPartitionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/RestPartitionsTableTest.java
new file mode 100644
index 0000000000..3dc54d44e7
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/RestPartitionsTableTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.RESTTestFileIO;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test PartitionsTable with REST catalog. */
+class RestPartitionsTableTest extends PartitionsTableTest {
+
+ private static final String TABLE_NAME = "MyTable";
+
+ private RESTCatalogServer restCatalogServer;
+
+ @BeforeEach
+ @Override
+ public void before() throws Exception {
+ String dataPath = warehouse.toString() + "/RestPartitionsTableTest";
+ String restWarehouse = UUID.randomUUID().toString();
+ String initToken = "init_token";
+ BearTokenAuthProvider authProvider = new
BearTokenAuthProvider(initToken);
+
+ Map<String, String> defaultConf =
+ new HashMap<>(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ CatalogOptions.WAREHOUSE.key(),
+ restWarehouse));
+ ConfigResponse config = new ConfigResponse(defaultConf,
ImmutableMap.of());
+ restCatalogServer = new RESTCatalogServer(dataPath, authProvider,
config, restWarehouse);
+ restCatalogServer.start();
+
+ Options options = new Options();
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ options.set(RESTCatalogOptions.TOKEN, initToken);
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
+ options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+ options.set(CatalogOptions.METASTORE.key(), "rest");
+ options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, dataPath);
+
+ catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
+
+ catalog.createDatabase(database, true);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option("bucket", "1")
+ .option("metastore.partitioned-table", "true")
+ .build();
+
+ Identifier tableId = identifier(TABLE_NAME);
+ catalog.createTable(tableId, schema, true);
+ table = (FileStoreTable) catalog.getTable(tableId);
+
+ Identifier filesTableId =
+ identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER +
PartitionsTable.PARTITIONS);
+ partitionsTable = (PartitionsTable) catalog.getTable(filesTableId);
+
+ write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 3, 5));
+
+ write(table, GenericRow.of(1, 1, 3), GenericRow.of(1, 2, 4));
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (restCatalogServer != null) {
+ restCatalogServer.shutdown();
+ }
+ }
+
+ @Test
+ @Override
+ void testPartitionAuditFieldsNull() throws Exception {
+ List<InternalRow> result = read(partitionsTable, new int[] {0, 5, 6,
7, 8});
+ assertThat(result).isNotEmpty();
+
+ for (InternalRow row : result) {
+ assertThat(row.isNullAt(1)).isFalse(); // created_at
+ assertThat(row.isNullAt(2)).isFalse(); // created_by
+ assertThat(row.getString(2).toString()).isEqualTo("created");
+ assertThat(row.isNullAt(3)).isFalse(); // updated_by
+ assertThat(row.getString(3).toString()).isEqualTo("updated");
+ if (!row.isNullAt(4)) { // options
+ String optionsJson = row.getString(4).toString();
+ assertThat(optionsJson).isNotEmpty();
+ }
+ }
+ }
+}