This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fdb4a934a0 [core] PartitionStatistics support totalBuckets (#6964)
fdb4a934a0 is described below
commit fdb4a934a007982c95019b315d1dc73d7eb34a27
Author: jianguotian <[email protected]>
AuthorDate: Wed Jan 14 18:07:35 2026 +0800
[core] PartitionStatistics support totalBuckets (#6964)
---
docs/static/rest-catalog-open-api.yaml | 6 +++
.../org/apache/paimon/partition/Partition.java | 7 +++-
.../paimon/partition/PartitionStatistics.java | 21 +++++++++-
.../paimon/partition/PartitionStatisticsTest.java | 44 ++++++++++++++++++++
.../org/apache/paimon/partition/PartitionTest.java | 4 ++
.../java/org/apache/paimon/catalog/Catalog.java | 1 +
.../org/apache/paimon/manifest/PartitionEntry.java | 43 +++++++++++++++----
.../paimon/table/format/FormatTableScan.java | 2 +-
.../paimon/table/system/PartitionsTable.java | 3 +-
.../paimon/utils/PartitionStatisticsReporter.java | 10 ++++-
.../paimon/partition/PartitionExpireTableTest.java | 2 +-
.../org/apache/paimon/rest/MockRESTMessage.java | 2 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 2 +
.../utils/PartitionStatisticsReporterTest.java | 2 +-
.../org/apache/paimon/flink/RESTCatalogITCase.java | 48 ++++++++++++++++++++++
.../java/org/apache/paimon/hive/HiveCatalog.java | 6 +++
.../org/apache/paimon/hive/HiveCatalogTest.java | 3 +-
paimon-python/pypaimon/snapshot/snapshot_commit.py | 7 +++-
paimon-python/pypaimon/write/file_store_commit.py | 6 ++-
19 files changed, 197 insertions(+), 22 deletions(-)
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index 88e7396514..22aa310ed3 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -3257,6 +3257,9 @@ components:
lastFileCreationTime:
type: integer
format: int64
+ totalBuckets:
+ type: integer
+ format: int32
done:
type: boolean
createdAt:
@@ -3290,6 +3293,9 @@ components:
lastFileCreationTime:
type: integer
format: int64
+ totalBuckets:
+ type: integer
+ format: int32
#######################################
# Examples of different values #
#######################################
diff --git
a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
index 427b61464f..c9fa4e0997 100644
--- a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
+++ b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java
@@ -81,13 +81,14 @@ public class Partition extends PartitionStatistics {
@JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
@JsonProperty(FIELD_FILE_COUNT) long fileCount,
@JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long
lastFileCreationTime,
+ @JsonProperty(FIELD_TOTAL_BUCKETS) int totalBuckets,
@JsonProperty(FIELD_DONE) boolean done,
@JsonProperty(FIELD_CREATED_AT) @Nullable Long createdAt,
@JsonProperty(FIELD_CREATED_BY) @Nullable String createdBy,
@JsonProperty(FIELD_UPDATED_AT) @Nullable Long updatedAt,
@JsonProperty(FIELD_UPDATED_BY) @Nullable String updatedBy,
@JsonProperty(FIELD_OPTIONS) @Nullable Map<String, String>
options) {
- super(spec, recordCount, fileSizeInBytes, fileCount,
lastFileCreationTime);
+ super(spec, recordCount, fileSizeInBytes, fileCount,
lastFileCreationTime, totalBuckets);
this.done = done;
this.createdAt = createdAt;
this.createdBy = createdBy;
@@ -102,6 +103,7 @@ public class Partition extends PartitionStatistics {
long fileSizeInBytes,
long fileCount,
long lastFileCreationTime,
+ int totalBuckets,
boolean done) {
this(
spec,
@@ -109,6 +111,7 @@ public class Partition extends PartitionStatistics {
fileSizeInBytes,
fileCount,
lastFileCreationTime,
+ totalBuckets,
done,
null,
null,
@@ -188,6 +191,8 @@ public class Partition extends PartitionStatistics {
+ fileCount
+ ", lastFileCreationTime="
+ lastFileCreationTime
+ + ", totalBuckets="
+ + totalBuckets
+ ", done="
+ done
+ ", createdAt="
diff --git
a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java
b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java
index 44b61c19d5..ab87f02ed6 100644
---
a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java
+++
b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java
@@ -44,6 +44,7 @@ public class PartitionStatistics implements Serializable {
public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
public static final String FIELD_FILE_COUNT = "fileCount";
public static final String FIELD_LAST_FILE_CREATION_TIME =
"lastFileCreationTime";
+ public static final String FIELD_TOTAL_BUCKETS = "totalBuckets";
@JsonProperty(FIELD_SPEC)
protected final Map<String, String> spec;
@@ -60,18 +61,25 @@ public class PartitionStatistics implements Serializable {
@JsonProperty(FIELD_LAST_FILE_CREATION_TIME)
protected final long lastFileCreationTime;
+ // defaults to 0 if this field is absent in the serialized data (e.g.,
from an older Paimon
+ // version)
+ @JsonProperty(FIELD_TOTAL_BUCKETS)
+ protected final int totalBuckets;
+
@JsonCreator
public PartitionStatistics(
@JsonProperty(FIELD_SPEC) Map<String, String> spec,
@JsonProperty(FIELD_RECORD_COUNT) long recordCount,
@JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
@JsonProperty(FIELD_FILE_COUNT) long fileCount,
- @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long
lastFileCreationTime) {
+ @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long
lastFileCreationTime,
+ @JsonProperty(FIELD_TOTAL_BUCKETS) int totalBuckets) {
this.spec = spec;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.fileCount = fileCount;
this.lastFileCreationTime = lastFileCreationTime;
+ this.totalBuckets = totalBuckets;
}
@JsonGetter(FIELD_SPEC)
@@ -99,6 +107,11 @@ public class PartitionStatistics implements Serializable {
return lastFileCreationTime;
}
+ @JsonGetter(FIELD_TOTAL_BUCKETS)
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -112,12 +125,14 @@ public class PartitionStatistics implements Serializable {
&& fileSizeInBytes == that.fileSizeInBytes
&& fileCount == that.fileCount
&& lastFileCreationTime == that.lastFileCreationTime
+ && totalBuckets == that.totalBuckets
&& Objects.equals(spec, that.spec);
}
@Override
public int hashCode() {
- return Objects.hash(spec, recordCount, fileSizeInBytes, fileCount,
lastFileCreationTime);
+ return Objects.hash(
+ spec, recordCount, fileSizeInBytes, fileCount,
lastFileCreationTime, totalBuckets);
}
@Override
@@ -133,6 +148,8 @@ public class PartitionStatistics implements Serializable {
+ fileCount
+ ", lastFileCreationTime="
+ lastFileCreationTime
+ + ", totalBuckets="
+ + totalBuckets
+ '}';
}
}
diff --git
a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionStatisticsTest.java
b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionStatisticsTest.java
new file mode 100644
index 0000000000..d9fd8e8bb1
--- /dev/null
+++
b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionStatisticsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PartitionStatistics}. */
+public class PartitionStatisticsTest {
+
+ @Test
+ void testLegacyPartitionStatisticsDeserialization() {
+ String legacyPartitionStatisticsJson =
+
"{\"spec\":{\"pt\":\"1\"},\"recordCount\":100,\"fileSizeInBytes\":1024,\"fileCount\":2,\"lastFileCreationTime\":123456789}";
+ PartitionStatistics stats =
+ JsonSerdeUtil.fromJson(legacyPartitionStatisticsJson,
PartitionStatistics.class);
+
+ assertThat(stats.spec()).containsEntry("pt", "1");
+ assertThat(stats.recordCount()).isEqualTo(100);
+ assertThat(stats.fileSizeInBytes()).isEqualTo(1024);
+ assertThat(stats.fileCount()).isEqualTo(2);
+ assertThat(stats.lastFileCreationTime()).isEqualTo(123456789L);
+ assertThat(stats.totalBuckets()).isEqualTo(0);
+ }
+}
diff --git
a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
index 7ec342d38d..b589a75cf1 100644
--- a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
+++ b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java
@@ -40,6 +40,7 @@ class PartitionTest {
1024L, // fileSizeInBytes
2L, // fileCount
System.currentTimeMillis(), // lastFileCreationTime
+ 10, // totalBuckets
false, // done
null, // createdAt
null, // createdBy
@@ -57,6 +58,7 @@ class PartitionTest {
assertThat(json).contains("done");
assertThat(json).contains("recordCount");
+ assertThat(json).contains("totalBuckets");
}
@Test
@@ -69,6 +71,7 @@ class PartitionTest {
1024L,
2L,
System.currentTimeMillis(),
+ 10, // totalBuckets
true,
1234567890L, // createdAt
"user1", // createdBy
@@ -78,6 +81,7 @@ class PartitionTest {
String json = JsonSerdeUtil.toFlatJson(partition);
+ assertThat(json).contains("totalBuckets");
assertThat(json).contains("createdAt");
assertThat(json).contains("createdBy");
assertThat(json).contains("updatedAt");
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index f4e7913c24..175a74cf75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -1074,6 +1074,7 @@ public interface Catalog extends AutoCloseable {
String NUM_FILES_PROP = "numFiles";
String TOTAL_SIZE_PROP = "totalSize";
String LAST_UPDATE_TIME_PROP = "lastUpdateTime";
+ String TOTAL_BUCKETS = "totalBuckets";
// ======================= Exceptions ===============================
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index d3417bb71c..aaee93602a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import static org.apache.paimon.manifest.FileKind.ADD;
import static org.apache.paimon.manifest.FileKind.DELETE;
@@ -43,18 +44,21 @@ public class PartitionEntry {
private final long fileSizeInBytes;
private final long fileCount;
private final long lastFileCreationTime;
+ private final int totalBuckets;
public PartitionEntry(
BinaryRow partition,
long recordCount,
long fileSizeInBytes,
long fileCount,
- long lastFileCreationTime) {
+ long lastFileCreationTime,
+ int totalBuckets) {
this.partition = partition;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.fileCount = fileCount;
this.lastFileCreationTime = lastFileCreationTime;
+ this.totalBuckets = totalBuckets;
}
public BinaryRow partition() {
@@ -77,13 +81,18 @@ public class PartitionEntry {
return lastFileCreationTime;
}
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
public PartitionEntry merge(PartitionEntry entry) {
return new PartitionEntry(
partition,
recordCount + entry.recordCount,
fileSizeInBytes + entry.fileSizeInBytes,
fileCount + entry.fileCount,
- Math.max(lastFileCreationTime, entry.lastFileCreationTime));
+ Math.max(lastFileCreationTime, entry.lastFileCreationTime),
+ entry.totalBuckets);
}
public Partition toPartition(InternalRowPartitionComputer computer) {
@@ -93,6 +102,7 @@ public class PartitionEntry {
fileSizeInBytes,
fileCount,
lastFileCreationTime,
+ totalBuckets,
false);
}
@@ -102,15 +112,16 @@ public class PartitionEntry {
recordCount,
fileSizeInBytes,
fileCount,
- lastFileCreationTime);
+ lastFileCreationTime,
+ totalBuckets);
}
public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
- return fromDataFile(entry.partition(), entry.kind(), entry.file());
+ return fromDataFile(entry.partition(), entry.kind(), entry.file(),
entry.totalBuckets());
}
public static PartitionEntry fromDataFile(
- BinaryRow partition, FileKind kind, DataFileMeta file) {
+ BinaryRow partition, FileKind kind, DataFileMeta file, int
totalBuckets) {
long recordCount = file.rowCount();
long fileSizeInBytes = file.fileSize();
long fileCount = 1;
@@ -120,7 +131,12 @@ public class PartitionEntry {
fileCount = -fileCount;
}
return new PartitionEntry(
- partition, recordCount, fileSizeInBytes, fileCount,
file.creationTimeEpochMillis());
+ partition,
+ recordCount,
+ fileSizeInBytes,
+ fileCount,
+ file.creationTimeEpochMillis(),
+ totalBuckets);
}
public static Collection<PartitionEntry> merge(Collection<ManifestEntry>
fileEntries) {
@@ -139,7 +155,12 @@ public class PartitionEntry {
for (DataSplit split : splits) {
BinaryRow partition = split.partition();
for (DataFileMeta file : split.dataFiles()) {
- PartitionEntry partitionEntry = fromDataFile(partition, ADD,
file);
+ PartitionEntry partitionEntry =
+ fromDataFile(
+ partition,
+ ADD,
+ file,
+
Optional.ofNullable(split.totalBuckets()).orElse(0));
partitions.compute(
partition,
(part, old) -> old == null ? partitionEntry :
old.merge(partitionEntry));
@@ -170,12 +191,18 @@ public class PartitionEntry {
&& fileSizeInBytes == that.fileSizeInBytes
&& fileCount == that.fileCount
&& lastFileCreationTime == that.lastFileCreationTime
+ && totalBuckets == that.totalBuckets
&& Objects.equals(partition, that.partition);
}
@Override
public int hashCode() {
return Objects.hash(
- partition, recordCount, fileSizeInBytes, fileCount,
lastFileCreationTime);
+ partition,
+ recordCount,
+ fileSizeInBytes,
+ fileCount,
+ lastFileCreationTime,
+ totalBuckets);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index 932f8e07c0..9a2461d592 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -109,7 +109,7 @@ public class FormatTableScan implements InnerTableScan {
List<PartitionEntry> partitionEntries = new ArrayList<>();
for (Pair<LinkedHashMap<String, String>, Path> partition2Path :
partition2Paths) {
BinaryRow row = toPartitionRow(partition2Path.getKey());
- partitionEntries.add(new PartitionEntry(row, -1L, -1L, -1L, -1L));
+ partitionEntries.add(new PartitionEntry(row, -1L, -1L, -1L, -1L,
-1));
}
return partitionEntries;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index d3314edfa3..22d2c85283 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -301,7 +301,8 @@ public class PartitionsTable implements ReadonlyTable {
partition.recordCount(),
partition.fileSizeInBytes(),
partition.fileCount(),
- partition.lastFileCreationTime());
+ partition.lastFileCreationTime(),
+ partition.totalBuckets());
}
private Timestamp toTimestamp(Long epochMillis) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
index c27e1768fd..da8d47ba8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Optional;
import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
@@ -73,6 +74,7 @@ public class PartitionStatisticsReporter implements Closeable
{
long rowCount = 0;
long totalSize = 0;
long fileCount = 0;
+ int totalBuckets = 0;
for (DataSplit split : splits) {
List<DataFileMeta> fileMetas = split.dataFiles();
fileCount += fileMetas.size();
@@ -80,11 +82,17 @@ public class PartitionStatisticsReporter implements
Closeable {
rowCount += fileMeta.rowCount();
totalSize += fileMeta.fileSize();
}
+ totalBuckets =
Optional.ofNullable(split.totalBuckets()).orElse(0);
}
PartitionStatistics partitionStats =
new PartitionStatistics(
- partitionSpec, rowCount, totalSize, fileCount,
modifyTimeMillis);
+ partitionSpec,
+ rowCount,
+ totalSize,
+ fileCount,
+ modifyTimeMillis,
+ totalBuckets);
LOG.info("alter partition {} with statistic {}.", partitionSpec,
partitionStats);
partitionHandler.alterPartitions(Collections.singletonList(partitionStats));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
index b78506a825..47ba63d9fd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
@@ -51,7 +51,7 @@ class PartitionExpireTableTest extends TableTestBase {
Table table = catalog.getTable(identifier());
String path = table.options().get("path");
- PartitionEntry expire = new PartitionEntry(BinaryRow.singleColumn(1),
1, 1, 1, 1);
+ PartitionEntry expire = new PartitionEntry(BinaryRow.singleColumn(1),
1, 1, 1, 1, 1);
TABLE_EXPIRE_PARTITIONS.put(path, Collections.singletonList(expire));
write(table, GenericRow.of(1, 1));
write(table, GenericRow.of(2, 2));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index d202d41ea1..f75e43dec2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -156,7 +156,7 @@ public class MockRESTMessage {
public static ListPartitionsResponse listPartitionsResponse() {
Map<String, String> spec = new HashMap<>();
spec.put("f0", "1");
- Partition partition = new Partition(spec, 1, 1, 1, 1, false);
+ Partition partition = new Partition(spec, 1, 1, 1, 1, 1, false);
return new ListPartitionsResponse(ImmutableList.of(partition));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index ff4d932501..8716d4ea7a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -2415,6 +2415,7 @@ public class RESTCatalogServer {
.lastFileCreationTime(),
stats
.lastFileCreationTime()),
+
stats.totalBuckets(),
oldPartition.done(),
oldPartition.createdAt(),
oldPartition.createdBy(),
@@ -2640,6 +2641,7 @@ public class RESTCatalogServer {
stats.fileSizeInBytes(),
stats.fileCount(),
stats.lastFileCreationTime(),
+ stats.totalBuckets(),
false,
System.currentTimeMillis(),
"created",
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
index 5ffb50e95f..7517fac6f1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
@@ -135,7 +135,7 @@ public class PartitionStatisticsReporterTest {
assertThat(partitionParams).containsKey("c1=a/");
assertThat(partitionParams.get("c1=a/").toString())
.isEqualTo(
- "{spec={c1=a}, recordCount=2, fileSizeInBytes=705,
fileCount=1, lastFileCreationTime=1729598544974}");
+ "{spec={c1=a}, recordCount=2, fileSizeInBytes=705,
fileCount=1, lastFileCreationTime=1729598544974, totalBuckets=-1}");
action.close();
assertThat(closed).isTrue();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index d79aa713c9..034816d6a0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.RESTToken;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -151,4 +152,51 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
sql(String.format("DROP FUNCTION %s.%s", DATABASE_NAME, functionName));
assertThat(catalog.functionExists(functionObjectPath)).isFalse();
}
+
+ @Test
+ public void testTotalBucketsStatistics() throws Exception {
+ String fixedBucketTableName = "fixed_bucket_tbl";
+ batchSql(
+ String.format(
+ "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED
BY (p) WITH ('bucket'='2', 'bucket-key'='a')",
+ DATABASE_NAME, fixedBucketTableName));
+ batchSql(
+ String.format(
+ "INSERT INTO %s.%s VALUES (1, 10, 1), (2, 20, 1)",
+ DATABASE_NAME, fixedBucketTableName));
+ validateTotalBuckets(DATABASE_NAME, fixedBucketTableName, 2);
+
+ String dynamicBucketTableName = "dynamic_bucket_tbl";
+ sql(
+ String.format(
+ "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED
BY (p) WITH ('bucket'='-1')",
+ DATABASE_NAME, dynamicBucketTableName));
+ sql(
+ String.format(
+ "INSERT INTO %s.dynamic_bucket_tbl VALUES (1, 10, 1),
(2, 20, 1)",
+ DATABASE_NAME));
+ validateTotalBuckets(DATABASE_NAME, "dynamic_bucket_tbl", -1);
+
+ String postponeBucketTableName = "postpone_bucket_tbl";
+ batchSql(
+ String.format(
+ "CREATE TABLE %s.%s (a INT, b INT, p INT, PRIMARY KEY
(p, a) NOT ENFORCED) PARTITIONED BY (p) WITH ('bucket'='-2')",
+ DATABASE_NAME, postponeBucketTableName));
+ batchSql(
+ String.format(
+ "INSERT INTO %s.%s VALUES (1, 10, 1), (2, 20, 1)",
+ DATABASE_NAME, postponeBucketTableName));
+ validateTotalBuckets(DATABASE_NAME, postponeBucketTableName, 1);
+ }
+
+ private void validateTotalBuckets(
+ String databaseName, String tableName, Integer
expectedTotalBuckets) throws Exception {
+ Catalog flinkCatalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+ try (org.apache.paimon.catalog.Catalog catalog = ((FlinkCatalog)
flinkCatalog).catalog()) {
+ List<Partition> partitions =
+ catalog.listPartitions(Identifier.create(databaseName,
tableName));
+ assertThat(partitions).isNotEmpty();
+
assertThat(partitions.get(0).totalBuckets()).isEqualTo(expectedTotalBuckets);
+ }
+ }
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 27e8c067dd..1eca498714 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -488,6 +488,8 @@ public class HiveCatalog extends AbstractCatalog {
String modifyTimeSeconds =
String.valueOf(partition.lastFileCreationTime() / 1000);
statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+ statistic.put(TOTAL_BUCKETS,
String.valueOf(partition.totalBuckets()));
+
// just for being compatible with hive metastore
statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
@@ -567,6 +569,9 @@ public class HiveCatalog extends AbstractCatalog {
parameters.getOrDefault(
LAST_UPDATE_TIME_PROP,
System.currentTimeMillis() + ""));
+ int totalBuckets =
+ Integer.parseInt(
+
parameters.getOrDefault(TOTAL_BUCKETS, "0"));
return new
org.apache.paimon.partition.Partition(
Collections.singletonMap(
tagToPartitionField,
part.getValues().get(0)),
@@ -574,6 +579,7 @@ public class HiveCatalog extends AbstractCatalog {
fileSizeInBytes,
fileCount,
lastFileCreationTime,
+ totalBuckets,
false);
})
.collect(Collectors.toList());
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index 5e19606716..de04b0c838 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -530,7 +530,7 @@ public class HiveCatalogTest extends CatalogTestBase {
long fileCreationTime = System.currentTimeMillis();
PartitionStatistics partition =
new PartitionStatistics(
- Collections.singletonMap("dt", "20250101"), 1, 2, 3,
fileCreationTime);
+ Collections.singletonMap("dt", "20250101"), 1, 2, 3,
fileCreationTime, 4);
catalog.alterPartitions(alterIdentifier,
Collections.singletonList(partition));
Partition partitionFromServer =
catalog.listPartitions(alterIdentifier).get(0);
checkPartition(
@@ -540,6 +540,7 @@ public class HiveCatalogTest extends CatalogTestBase {
2,
3,
fileCreationTime,
+ 4,
false),
partitionFromServer);
diff --git a/paimon-python/pypaimon/snapshot/snapshot_commit.py
b/paimon-python/pypaimon/snapshot/snapshot_commit.py
index 50727b6ce3..156036165a 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_commit.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_commit.py
@@ -39,11 +39,12 @@ class PartitionStatistics:
file_size_in_bytes: int = json_field("fileSizeInBytes", default=0)
file_count: int = json_field("fileCount", default=0)
last_file_creation_time: int = json_field("lastFileCreationTime",
default_factory=lambda: int(time.time() * 1000))
+ total_buckets: int = json_field("totalBuckets", default=0)
@classmethod
def create(cls, partition_spec: Dict[str, str] = None, record_count: int =
0,
file_count: int = 0, file_size_in_bytes: int = 0,
- last_file_creation_time: int = None) -> 'PartitionStatistics':
+ last_file_creation_time: int = None, total_buckets: int = 0) ->
'PartitionStatistics':
"""
Factory method to create PartitionStatistics with backward
compatibility.
@@ -53,6 +54,7 @@ class PartitionStatistics:
file_count: Number of files
file_size_in_bytes: Total file size in bytes
last_file_creation_time: Last file creation time in milliseconds
+ total_buckets: Total number of buckets in the partition
Returns:
PartitionStatistics instance
@@ -62,7 +64,8 @@ class PartitionStatistics:
record_count=record_count,
file_count=file_count,
file_size_in_bytes=file_size_in_bytes,
- last_file_creation_time=last_file_creation_time or int(time.time()
* 1000)
+ last_file_creation_time=last_file_creation_time or int(time.time()
* 1000),
+ total_buckets=total_buckets
)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 6c65c5173a..da27c499e8 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -450,7 +450,8 @@ class FileStoreCommit:
'record_count': 0,
'file_count': 0,
'file_size_in_bytes': 0,
- 'last_file_creation_time': 0
+ 'last_file_creation_time': 0,
+ 'total_buckets': entry.total_buckets
}
# Following Java implementation: PartitionEntry.fromDataFile()
@@ -485,7 +486,8 @@ class FileStoreCommit:
record_count=stats['record_count'],
file_count=stats['file_count'],
file_size_in_bytes=stats['file_size_in_bytes'],
- last_file_creation_time=stats['last_file_creation_time']
+ last_file_creation_time=stats['last_file_creation_time'],
+ total_buckets=stats['total_buckets']
)
for stats in partition_stats.values()
]