This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dfa121bd92 [iceberg] Enhance Iceberg snapshot summary (#6370)
dfa121bd92 is described below
commit dfa121bd92144d97a419feb4951b3d1b67764b31
Author: Jiajia Li <[email protected]>
AuthorDate: Fri Oct 10 10:02:16 2025 +0800
[iceberg] Enhance Iceberg snapshot summary (#6370)
---
.../paimon/iceberg/IcebergCommitCallback.java | 2 +-
.../iceberg/metadata/IcebergSnapshotSummary.java | 37 ++++++++++++++++------
.../action/cdc/format/canal/CanalRecordParser.java | 4 +--
.../format/parquet/reader/ParquetReaderUtil.java | 12 +++----
.../apache/paimon/iceberg/IcebergMetadataTest.java | 34 ++++++++++++++++++++
5 files changed, 70 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index e8f85c662f..ffd99a30bd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -348,7 +348,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
null);
// Tags can only be included in Iceberg if they point to an Iceberg
snapshot that
- // exists. Otherwise an Iceberg client fails to parse the metadata and
all reads fail.
+ // exists. Otherwise, an Iceberg client fails to parse the metadata
and all reads fail.
// Only the latest snapshot ID is added to Iceberg in this code path.
Since this snapshot
// has just been committed to Paimon, it is not possible for any
Paimon tag to reference it
// yet.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
index c620c48098..c8f64d4107 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
@@ -19,10 +19,11 @@
package org.apache.paimon.iceberg.metadata;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
/**
@@ -38,22 +39,38 @@ public class IcebergSnapshotSummary {
public static final IcebergSnapshotSummary APPEND = new
IcebergSnapshotSummary("append");
public static final IcebergSnapshotSummary OVERWRITE = new
IcebergSnapshotSummary("overwrite");
- @JsonProperty(FIELD_OPERATION)
- private final String operation;
+ private final Map<String, String> summary;
@JsonCreator
- public IcebergSnapshotSummary(@JsonProperty(FIELD_OPERATION) String
operation) {
- this.operation = operation;
+ public IcebergSnapshotSummary(Map<String, String> summary) {
+ this.summary = summary != null ? new HashMap<>(summary) : new
HashMap<>();
+ }
+
+ public IcebergSnapshotSummary(String operation) {
+ this.summary = new HashMap<>();
+ this.summary.put(FIELD_OPERATION, operation);
+ }
+
+ @JsonValue
+ public Map<String, String> getSummary() {
+ return new HashMap<>(summary);
}
- @JsonGetter(FIELD_OPERATION)
public String operation() {
- return operation;
+ return summary.get(FIELD_OPERATION);
+ }
+
+ public String get(String key) {
+ return summary.get(key);
+ }
+
+ public void put(String key, String value) {
+ summary.put(key, value);
}
@Override
public int hashCode() {
- return Objects.hash(operation);
+ return Objects.hash(summary);
}
@Override
@@ -66,6 +83,6 @@ public class IcebergSnapshotSummary {
}
IcebergSnapshotSummary that = (IcebergSnapshotSummary) o;
- return Objects.equals(operation, that.operation);
+ return Objects.equals(summary, that.summary);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 5864396564..5942ad0276 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -169,8 +169,8 @@ public class CanalRecordParser extends
AbstractJsonRecordParser {
typeInfo.f0, typeInfo.f1, typeInfo.f2,
typeMapping);
schemaBuilder.column(originalName, paimonDataType);
- String filedValue =
Objects.toString(recordMap.get(originalName), null);
- String newValue = transformValue(filedValue, typeInfo.f0,
originalType);
+ String fieldValue =
Objects.toString(recordMap.get(originalName), null);
+ String newValue = transformValue(fieldValue, typeInfo.f0,
originalType);
rowData.put(originalName, newValue);
}
} else {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
index f686698034..e2ebb47b90 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
@@ -168,17 +168,17 @@ public class ParquetReaderUtil {
int repetitionLevel = columnIO.getRepetitionLevel();
int definitionLevel = columnIO.getDefinitionLevel();
DataType type = dataField.type();
- String filedName = dataField.name();
+ String fieldName = dataField.name();
if (type instanceof RowType) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
RowType rowType = (RowType) type;
ImmutableList.Builder<ParquetField> fieldsBuilder =
ImmutableList.builder();
List<String> fieldNames = rowType.getFieldNames();
- List<DataField> childrens = rowType.getFields();
- for (int i = 0; i < childrens.size(); i++) {
+ List<DataField> children = rowType.getFields();
+ for (int i = 0; i < children.size(); i++) {
fieldsBuilder.add(
constructField(
- childrens.get(i),
+ children.get(i),
lookupColumnByName(groupColumnIO,
fieldNames.get(i))));
}
@@ -281,8 +281,8 @@ public class ParquetReaderUtil {
ColumnIO elementTypeColumnIO;
if (columnIO instanceof GroupColumnIO) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
- if (!StringUtils.isNullOrWhitespaceOnly(filedName)) {
- while (!Objects.equals(groupColumnIO.getName(),
filedName)) {
+ if (!StringUtils.isNullOrWhitespaceOnly(fieldName)) {
+ while (!Objects.equals(groupColumnIO.getName(),
fieldName)) {
groupColumnIO = (GroupColumnIO)
groupColumnIO.getChild(0);
}
elementTypeColumnIO = groupColumnIO;
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
index 3fc1aad2cd..91a1b5acf2 100644
---
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
@@ -483,6 +483,40 @@ class IcebergMetadataTest {
assertThat(paimonIcebergMetadata2.snapshots().get(1).sequenceNumber()).isEqualTo(2L);
}
+ @Test
+ @DisplayName("Test reading metadata with snapshot summary")
+ void testReadMetadataWithSnapshotSummary() throws Exception {
+ // Create a basic Iceberg table
+ Table icebergTable = createBasicIcebergTable("snapshot_summary_table");
+
+ // Perform append operation
+ icebergTable
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/append-data.parquet")
+ .withFileSizeInBytes(100)
+ .withRecordCount(10)
+ .build())
+ .commit();
+
+ // Read metadata after append operation
+ IcebergMetadata paimonIcebergMetadata =
readIcebergMetadata("snapshot_summary_table");
+
+ // Verify snapshots
+ assertThat(paimonIcebergMetadata.snapshots()).hasSize(1);
+ IcebergSnapshot snapshot = paimonIcebergMetadata.snapshots().get(0);
+ assertThat(snapshot.snapshotId()).isNotNull();
+ assertThat(snapshot.parentSnapshotId()).isNull();
+
+ // Verify snapshot summary contains operation information
+ assertThat(snapshot.summary().operation()).isEqualTo("append");
+
assertThat(snapshot.summary().getSummary().get("operation")).isEqualTo("append");
+
assertThat(snapshot.summary().getSummary().get("total-records")).isEqualTo("10");
+
assertThat(snapshot.summary().getSummary().get("added-data-files")).isEqualTo("1");
+
assertThat(snapshot.summary().getSummary().get("added-files-size")).isEqualTo("100");
+ }
+
/** Helper method to create a basic Iceberg table with simple schema. */
private Table createBasicIcebergTable(String tableName) {
TableIdentifier identifier = TableIdentifier.of("testdb", tableName);