This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 7a1c5b1a [755] Support column stats for paimon (#767)
7a1c5b1a is described below
commit 7a1c5b1a1b1c834ff088465b08333f289b1d7cfa
Author: Mao <[email protected]>
AuthorDate: Tue Jan 20 05:54:05 2026 +1100
[755] Support column stats for paimon (#767)
* paimon: wip column stats
* paimon column stats: fix tests
* paimon: fixing some tests
* paimon: fixing more tests
* paimon: fix column stats column checks
* paimon: stats: handling timestamp as long
* paimon: add to xtable-utilities
* paimon: lint
* paimon: ticked version to 1.3.1
* paimon: reducing logging
* paimon: annotating assumptions with comments
* paimon: expanded tests for stats extractor
* paimon: stats extractor to its own class
* paimon: handling deleted stats
* paimon: spotless
* paimon: fix tests
* paimon: tests remove wildcard imports
* paimon: assumptions validated
* paimon: code review changes
* paimon: added test case for lack of stats for complex fields
* paimon: extended tests and updated test TODO with github issue reference
* paimon: fix tests
* pin catalyst version
* paimon: fix tests in xtable-utilities
---------
Co-authored-by: Timothy Brown <[email protected]>
---
pom.xml | 8 +-
.../xtable/paimon/PaimonDataFileExtractor.java | 13 +-
.../apache/xtable/paimon/PaimonStatsExtractor.java | 187 +++++++
.../java/org/apache/xtable/TestPaimonTable.java | 64 ++-
.../xtable/paimon/TestPaimonConversionSource.java | 10 +-
.../xtable/paimon/TestPaimonDataFileExtractor.java | 85 ++--
.../xtable/paimon/TestPaimonStatsExtractor.java | 539 +++++++++++++++++++++
.../main/resources/xtable-conversion-defaults.yaml | 4 +-
.../org/apache/xtable/utilities/TestRunSync.java | 9 +-
9 files changed, 836 insertions(+), 83 deletions(-)
diff --git a/pom.xml b/pom.xml
index 855894ec..92845db6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
<spark.version.prefix>3.4</spark.version.prefix>
<iceberg.version>1.4.2</iceberg.version>
<delta.version>2.4.0</delta.version>
- <paimon.version>1.2.0</paimon.version>
+ <paimon.version>1.3.1</paimon.version>
<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
<apache.rat.version>0.16.1</apache.rat.version>
@@ -374,6 +374,12 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
index 4555b0cf..e452c85b 100644
---
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
@@ -19,7 +19,6 @@
package org.apache.xtable.paimon;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -29,7 +28,6 @@ import java.util.Set;
import lombok.extern.log4j.Log4j2;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
@@ -39,7 +37,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
@@ -49,6 +46,8 @@ public class PaimonDataFileExtractor {
private final PaimonPartitionExtractor partitionExtractor =
PaimonPartitionExtractor.getInstance();
+ private final PaimonStatsExtractor statsExtractor =
PaimonStatsExtractor.getInstance();
+
private static final PaimonDataFileExtractor INSTANCE = new
PaimonDataFileExtractor();
public static PaimonDataFileExtractor getInstance() {
@@ -84,7 +83,7 @@ public class PaimonDataFileExtractor {
.recordCount(entry.file().rowCount())
.partitionValues(
partitionExtractor.toPartitionValues(table, entry.partition(),
internalSchema))
- .columnStats(toColumnStats(entry.file()))
+ .columnStats(statsExtractor.extractColumnStats(entry.file(),
internalSchema))
.build();
}
@@ -101,12 +100,6 @@ public class PaimonDataFileExtractor {
}
}
- private List<ColumnStat> toColumnStats(DataFileMeta file) {
- // TODO: Implement logic to extract column stats from the file meta
- // https://github.com/apache/incubator-xtable/issues/755
- return Collections.emptyList();
- }
-
/**
* Extracts file changes (added and removed files) from delta manifests for
a given snapshot. This
* method reads only the delta manifests which contain the changes
introduced in this specific
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonStatsExtractor.java
new file mode 100644
index 00000000..092061db
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonStatsExtractor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.types.TimestampType;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.Range;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonStatsExtractor {
+ private static final PaimonStatsExtractor INSTANCE = new
PaimonStatsExtractor();
+
+ public static PaimonStatsExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public List<ColumnStat> extractColumnStats(DataFileMeta file, InternalSchema
internalSchema) {
+ List<ColumnStat> columnStats = new ArrayList<>();
+ Map<String, InternalField> fieldMap =
+ internalSchema.getAllFields().stream()
+ .collect(Collectors.toMap(InternalField::getPath, f -> f));
+
+ // stats for all columns are present in valueStats, we can safely ignore
file.keyStats()
+ SimpleStats valueStats = file.valueStats();
+ if (valueStats != null) {
+ List<String> colNames = file.valueStatsCols();
+ if (colNames == null) {
+ // if column names are not present, then stats are being collected for
all columns
+ colNames =
+ internalSchema.getAllFields().stream()
+ .map(InternalField::getPath)
+ .collect(Collectors.toList());
+ }
+
+ if (colNames.size() != valueStats.minValues().getFieldCount()) {
+ // paranoia check - this should never happen, but if the code reaches
here, then there is a
+ // bug! Please file a bug report
+ throw new ReadException(
+ String.format(
+ "Mismatch between column stats names and values arity:
names=%d, values=%d",
+ colNames.size(), valueStats.minValues().getFieldCount()));
+ }
+
+ collectColumnStats(columnStats, valueStats, colNames, fieldMap,
file.rowCount());
+ }
+
+ return columnStats;
+ }
+
+ private void collectColumnStats(
+ List<ColumnStat> columnStats,
+ SimpleStats stats,
+ List<String> colNames,
+ Map<String, InternalField> fieldMap,
+ long rowCount) {
+
+ BinaryRow minValues = stats.minValues();
+ BinaryRow maxValues = stats.maxValues();
+ BinaryArray nullCounts = stats.nullCounts();
+
+ for (int i = 0; i < colNames.size(); i++) {
+ String colName = colNames.get(i);
+ InternalField field = fieldMap.get(colName);
+ if (field == null) {
+ continue;
+ }
+
+ // Check if we already have stats for this field
+ boolean alreadyExists =
+ columnStats.stream().anyMatch(cs ->
cs.getField().getPath().equals(colName));
+ if (alreadyExists) {
+ continue;
+ }
+
+ InternalType type = field.getSchema().getDataType();
+ Object min = getValue(minValues, i, type, field.getSchema());
+ Object max = getValue(maxValues, i, type, field.getSchema());
+ long nullCount = (nullCounts != null && i < nullCounts.size()) ?
nullCounts.getLong(i) : 0L;
+
+ columnStats.add(
+ ColumnStat.builder()
+ .field(field)
+ .range(min != null && max != null ? Range.vector(min, max) :
null)
+ .numNulls(nullCount)
+ .numValues(rowCount)
+ .build());
+ }
+ }
+
+ private Object getValue(BinaryRow row, int index, InternalType type,
InternalSchema fieldSchema) {
+ if (row.isNullAt(index)) {
+ return null;
+ }
+ switch (type) {
+ case BOOLEAN:
+ return row.getBoolean(index);
+ case INT:
+ case DATE:
+ return row.getInt(index);
+ case LONG:
+ return row.getLong(index);
+ case TIMESTAMP:
+ case TIMESTAMP_NTZ:
+ int tsPrecision;
+ InternalSchema.MetadataValue tsPrecisionEnum =
+ (InternalSchema.MetadataValue)
+
fieldSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION);
+ if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) {
+ tsPrecision = 3;
+ } else if (tsPrecisionEnum == InternalSchema.MetadataValue.MICROS) {
+ tsPrecision = 6;
+ } else if (tsPrecisionEnum == InternalSchema.MetadataValue.NANOS) {
+ tsPrecision = 9;
+ } else {
+ log.warn(
+ "Field idx={}, name={} does not have
MetadataKey.TIMESTAMP_PRECISION set, defaulting to default precision",
+ index,
+ fieldSchema.getName());
+ tsPrecision = TimestampType.DEFAULT_PRECISION;
+ }
+ Timestamp ts = row.getTimestamp(index, tsPrecision);
+
+ // according to docs for org.apache.xtable.model.stat.Range, timestamp
is stored as millis
+ // or micros - even if precision is higher than micros, return micros
+ if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) {
+ return ts.getMillisecond();
+ } else {
+ return ts.toMicros();
+ }
+ case FLOAT:
+ return row.getFloat(index);
+ case DOUBLE:
+ return row.getDouble(index);
+ case STRING:
+ case ENUM:
+ return row.getString(index).toString();
+ case DECIMAL:
+ int precision =
+ (int)
fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
+ int scale = (int)
fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
+ return row.getDecimal(index, precision, scale).toBigDecimal();
+ default:
+ log.warn(
+ "Handling of {}-type stats for column idx={}, name={} is not yet
implemented, skipping stats for this column",
+ type,
+ index,
+ fieldSchema.getName());
+ return null;
+ }
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
index 55102007..1fb9314c 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
@@ -70,9 +70,21 @@ public class TestPaimonTable implements
GenericTable<GenericRow, String> {
Path tempDir,
Configuration hadoopConf,
boolean additionalColumns) {
+
+ Schema schema = buildGenericSchema(partitionField, additionalColumns);
+ return createTable(tableName, partitionField, tempDir, hadoopConf,
additionalColumns, schema);
+ }
+
+ public static GenericTable<GenericRow, String> createTable(
+ String tableName,
+ String partitionField,
+ Path tempDir,
+ Configuration hadoopConf,
+ boolean additionalColumns,
+ Schema schema) {
String basePath = initBasePath(tempDir, tableName);
Catalog catalog = createFilesystemCatalog(basePath, hadoopConf);
- FileStoreTable paimonTable = createTable(catalog, partitionField,
additionalColumns);
+ FileStoreTable paimonTable = createTable(catalog, tableName, schema);
System.out.println(
"Initialized Paimon test table at base path: "
@@ -90,12 +102,10 @@ public class TestPaimonTable implements
GenericTable<GenericRow, String> {
return CatalogFactory.createCatalog(context);
}
- public static FileStoreTable createTable(
- Catalog catalog, String partitionField, boolean additionalColumns) {
+ public static FileStoreTable createTable(Catalog catalog, String tableName,
Schema schema) {
try {
catalog.createDatabase("test_db", true);
- Identifier identifier = Identifier.create("test_db", "test_table");
- Schema schema = buildSchema(partitionField, additionalColumns);
+ Identifier identifier = Identifier.create("test_db", tableName);
catalog.createTable(identifier, schema, true);
return (FileStoreTable) catalog.getTable(identifier);
} catch (Exception e) {
@@ -103,7 +113,7 @@ public class TestPaimonTable implements
GenericTable<GenericRow, String> {
}
}
- private static Schema buildSchema(String partitionField, boolean
additionalColumns) {
+ private static Schema buildGenericSchema(String partitionField, boolean
additionalColumns) {
Schema.Builder builder =
Schema.newBuilder()
.primaryKey("id")
@@ -116,7 +126,8 @@ public class TestPaimonTable implements
GenericTable<GenericRow, String> {
.column("description", DataTypes.VARCHAR(255))
.option("bucket", "1")
.option("bucket-key", "id")
- .option("full-compaction.delta-commits", "1");
+ .option("full-compaction.delta-commits", "1")
+ .option("metadata.stats-mode", "full");
if (partitionField != null) {
builder
@@ -178,20 +189,12 @@ public class TestPaimonTable implements
GenericTable<GenericRow, String> {
}
private List<GenericRow> insertRecordsToPartition(int numRows, String
partitionValue) {
- BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
- try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
- List<GenericRow> rows = new ArrayList<>(numRows);
- for (int i = 0; i < numRows; i++) {
- GenericRow row = buildGenericRow(i, paimonTable.schema(),
partitionValue);
- writer.write(row);
- rows.add(row);
- }
- commitWrites(batchWriteBuilder, writer);
- compactTable();
- return rows;
- } catch (Exception e) {
- throw new RuntimeException("Failed to insert rows into Paimon table", e);
+ List<GenericRow> rows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; i++) {
+ rows.add(buildGenericRow(i, paimonTable.schema(), partitionValue));
}
+ writeRows(paimonTable, rows);
+ return rows;
}
@Override
@@ -224,8 +227,12 @@ public class TestPaimonTable implements
GenericTable<GenericRow, String> {
}
private void compactTable() {
- BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
- SnapshotReader snapshotReader = paimonTable.newSnapshotReader();
+ compactTable(paimonTable);
+ }
+
+ public static void compactTable(FileStoreTable table) {
+ BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+ SnapshotReader snapshotReader = table.newSnapshotReader();
try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
for (BucketEntry bucketEntry : snapshotReader.bucketEntries()) {
writer.compact(bucketEntry.partition(), bucketEntry.bucket(), true);
@@ -236,6 +243,19 @@ public class TestPaimonTable implements
GenericTable<GenericRow, String> {
}
}
+ public static void writeRows(FileStoreTable table, List<GenericRow> rows) {
+ BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+ for (GenericRow row : rows) {
+ writer.write(row);
+ }
+ commitWrites(batchWriteBuilder, writer);
+ compactTable(table);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to write rows into Paimon table", e);
+ }
+ }
+
private static void commitWrites(BatchWriteBuilder batchWriteBuilder,
BatchTableWrite writer)
throws Exception {
BatchTableCommit commit = batchWriteBuilder.newCommit();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
index 5e28e010..71b9022a 100644
---
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
@@ -18,7 +18,13 @@
package org.apache.xtable.paimon;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.time.Instant;
@@ -100,7 +106,7 @@ public class TestPaimonConversionSource {
InternalTable result = unpartitionedSource.getTable(snapshot);
assertNotNull(result);
- assertEquals("test_table", result.getName());
+ assertEquals("unpartitioned_table", result.getName());
assertEquals(TableFormat.PAIMON, result.getTableFormat());
assertNotNull(result.getReadSchema());
assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION,
result.getLayoutStrategy());
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
index 0f3ed30d..cd528361 100644
---
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
@@ -24,8 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
-import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.Snapshot;
@@ -34,30 +34,30 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.apache.xtable.TestPaimonTable;
-import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
public class TestPaimonDataFileExtractor {
private static final PaimonDataFileExtractor extractor =
PaimonDataFileExtractor.getInstance();
+ private static final PaimonSchemaExtractor schemaExtractor =
PaimonSchemaExtractor.getInstance();
@TempDir private Path tempDir;
private TestPaimonTable testTable;
private FileStoreTable paimonTable;
- private InternalSchema testSchema;
@Test
void testToInternalDataFilesWithUnpartitionedTable() {
createUnpartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
+ assertEquals(1, schema.getRecordKeyFields().size());
// Insert some data to create files
testTable.insertRows(5);
List<InternalDataFile> result =
extractor.toInternalDataFiles(
- paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
schema);
assertNotNull(result);
assertFalse(result.isEmpty());
@@ -68,18 +68,26 @@ public class TestPaimonDataFileExtractor {
assertTrue(dataFile.getFileSizeBytes() > 0);
assertEquals(5, dataFile.getRecordCount());
assertEquals(0, dataFile.getPartitionValues().size());
+ // check all fields have stats, and stats values (min->max range) are not
null
+ assertEquals(
+ schema.getFields().size(),
+ dataFile.getColumnStats().stream()
+ .filter(stat -> stat.getRange() != null)
+ .collect(Collectors.toList())
+ .size());
}
@Test
void testToInternalDataFilesWithPartitionedTable() {
createPartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
// Insert some data to create files
testTable.insertRows(5);
List<InternalDataFile> result =
extractor.toInternalDataFiles(
- paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
schema);
assertNotNull(result);
assertFalse(result.isEmpty());
@@ -90,11 +98,19 @@ public class TestPaimonDataFileExtractor {
assertTrue(dataFile.getFileSizeBytes() > 0);
assertEquals(5, dataFile.getRecordCount());
assertNotNull(dataFile.getPartitionValues());
+ // check all fields have stats, and stats values (min->max range) are not
null
+ assertEquals(
+ schema.getFields().size(),
+ dataFile.getColumnStats().stream()
+ .filter(stat -> stat.getRange() != null)
+ .collect(Collectors.toList())
+ .size());
}
@Test
void testToInternalDataFilesWithTableWithPrimaryKeys() {
createTableWithPrimaryKeys();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
// Insert some data to create files
testTable.insertRows(5);
@@ -102,7 +118,7 @@ public class TestPaimonDataFileExtractor {
// Get the latest snapshot
List<InternalDataFile> result =
extractor.toInternalDataFiles(
- paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
schema);
assertNotNull(result);
assertFalse(result.isEmpty());
@@ -111,18 +127,26 @@ public class TestPaimonDataFileExtractor {
assertNotNull(dataFile.getPhysicalPath());
assertTrue(dataFile.getFileSizeBytes() > 0);
assertEquals(5, dataFile.getRecordCount());
+ // check all fields have stats, and stats values (min->max range) are not
null
+ assertEquals(
+ schema.getFields().size(),
+ dataFile.getColumnStats().stream()
+ .filter(stat -> stat.getRange() != null)
+ .collect(Collectors.toList())
+ .size());
}
@Test
void testPhysicalPathFormat() {
createUnpartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
// Insert data
testTable.insertRows(2);
List<InternalDataFile> result =
extractor.toInternalDataFiles(
- paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
schema);
assertFalse(result.isEmpty());
@@ -133,25 +157,10 @@ public class TestPaimonDataFileExtractor {
}
}
- @Test
- void testColumnStatsAreEmpty() {
- createUnpartitionedTable();
-
- testTable.insertRows(1);
-
- List<InternalDataFile> result =
- extractor.toInternalDataFiles(
- paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
-
- assertFalse(result.isEmpty());
- for (InternalDataFile dataFile : result) {
- assertEquals(0, dataFile.getColumnStats().size());
- }
- }
-
@Test
void testExtractFilesDiffWithNewFiles() {
createUnpartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
// Insert initial data
testTable.insertRows(5);
@@ -163,8 +172,7 @@ public class TestPaimonDataFileExtractor {
Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
assertNotNull(secondSnapshot);
- InternalFilesDiff filesDiff =
- extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+ InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable,
secondSnapshot, schema);
// Verify we have replaced the single file on this setup
assertNotNull(filesDiff);
@@ -181,6 +189,7 @@ public class TestPaimonDataFileExtractor {
@Test
void testExtractFilesDiffWithPartitionedTable() {
createPartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
// Insert initial data
testTable.insertRows(5);
@@ -192,8 +201,7 @@ public class TestPaimonDataFileExtractor {
Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
assertNotNull(secondSnapshot);
- InternalFilesDiff filesDiff =
- extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+ InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable,
secondSnapshot, schema);
// Verify we have added files with partition values
assertNotNull(filesDiff);
@@ -207,6 +215,7 @@ public class TestPaimonDataFileExtractor {
@Test
void testExtractFilesDiffWithTableWithPrimaryKeys() {
createTableWithPrimaryKeys();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
// Insert initial data
testTable.insertRows(5);
@@ -218,8 +227,7 @@ public class TestPaimonDataFileExtractor {
Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
assertNotNull(secondSnapshot);
- InternalFilesDiff filesDiff =
- extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+ InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable,
secondSnapshot, schema);
// Verify the diff is returned (size may vary based on compaction)
assertNotNull(filesDiff);
@@ -230,14 +238,14 @@ public class TestPaimonDataFileExtractor {
@Test
void testExtractFilesDiffForFirstSnapshot() {
createUnpartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
// Insert data to create first snapshot
testTable.insertRows(5);
Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
assertNotNull(firstSnapshot);
- InternalFilesDiff filesDiff =
- extractor.extractFilesDiff(paimonTable, firstSnapshot, testSchema);
+ InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable,
firstSnapshot, schema);
// First snapshot should only have added files
assertNotNull(filesDiff);
@@ -250,8 +258,6 @@ public class TestPaimonDataFileExtractor {
(TestPaimonTable)
TestPaimonTable.createTable("test_table", null, tempDir, new
Configuration(), false);
paimonTable = testTable.getPaimonTable();
- testSchema =
- InternalSchema.builder().build(); // empty schema won't matter for
non-partitioned tables
}
private void createPartitionedTable() {
@@ -259,15 +265,6 @@ public class TestPaimonDataFileExtractor {
(TestPaimonTable)
TestPaimonTable.createTable("test_table", "level", tempDir, new
Configuration(), false);
paimonTable = testTable.getPaimonTable();
-
- // just the partition field matters for this test
- InternalField partitionField =
- InternalField.builder()
- .name("level")
-
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
- .build();
-
- testSchema =
InternalSchema.builder().fields(Collections.singletonList(partitionField)).build();
}
private void createTableWithPrimaryKeys() {
@@ -275,7 +272,5 @@ public class TestPaimonDataFileExtractor {
(TestPaimonTable)
TestPaimonTable.createTable("test_table", null, tempDir, new
Configuration(), false);
paimonTable = testTable.getPaimonTable();
- testSchema =
- InternalSchema.builder().build(); // empty schema won't matter for
non-partitioned tables
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonStatsExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonStatsExtractor.java
new file mode 100644
index 00000000..a75fef0a
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonStatsExtractor.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.math.BigDecimal;
+import java.nio.file.Path;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+@Log4j2
+public class TestPaimonStatsExtractor {
+ private static final PaimonDataFileExtractor extractor =
PaimonDataFileExtractor.getInstance();
+ private static final PaimonSchemaExtractor schemaExtractor =
PaimonSchemaExtractor.getInstance();
+
+ @TempDir private Path tempDir;
+ private TestPaimonTable testTable;
+ private FileStoreTable paimonTable;
+
+ @Test
+ void testColumnStatsUnpartitioned() {
+ createUnpartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
+
+ List<GenericRow> rows = testTable.insertRows(10);
+
+ List<InternalDataFile> result =
+ extractor.toInternalDataFiles(
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
schema);
+
+ assertFalse(result.isEmpty());
+ InternalDataFile dataFile = result.get(0);
+ List<ColumnStat> stats = dataFile.getColumnStats();
+ assertFalse(stats.isEmpty());
+
+ // Verify "id" stats (INT)
+ int minId = rows.stream().map(r ->
r.getInt(0)).min(Integer::compareTo).get();
+ int maxId = rows.stream().map(r ->
r.getInt(0)).max(Integer::compareTo).get();
+ ColumnStat idStat = getColumnStat(stats, "id");
+ assertEquals(Range.vector(minId, maxId), idStat.getRange());
+ assertEquals(0, idStat.getNumNulls());
+
+ // Verify "name" stats (STRING)
+ String minName = rows.stream().map(r ->
r.getString(1).toString()).min(String::compareTo).get();
+ String maxName = rows.stream().map(r ->
r.getString(1).toString()).max(String::compareTo).get();
+ ColumnStat nameStat = getColumnStat(stats, "name");
+ assertEquals(Range.vector(minName, maxName), nameStat.getRange());
+ assertEquals(0, nameStat.getNumNulls());
+
+ // Verify "value" stats (DOUBLE)
+ double minValue = rows.stream().map(r ->
r.getDouble(2)).min(Double::compareTo).get();
+ double maxValue = rows.stream().map(r ->
r.getDouble(2)).max(Double::compareTo).get();
+ ColumnStat valueStat = getColumnStat(stats, "value");
+ assertEquals(Range.vector(minValue, maxValue), valueStat.getRange());
+ assertEquals(0, valueStat.getNumNulls());
+
+ // Verify "created_at" stats (TIMESTAMP)
+ Timestamp minCreatedAt =
+ rows.stream().map(r -> r.getTimestamp(3,
9)).min(Timestamp::compareTo).get();
+ Timestamp maxCreatedAt =
+ rows.stream().map(r -> r.getTimestamp(3,
9)).max(Timestamp::compareTo).get();
+ ColumnStat createdAtStat = getColumnStat(stats, "created_at");
+ assertEquals(
+ Range.vector(minCreatedAt.toMicros(), maxCreatedAt.toMicros()),
createdAtStat.getRange());
+ assertEquals(0, createdAtStat.getNumNulls());
+
+ // Verify "updated_at" stats (TIMESTAMP)
+ Timestamp minUpdatedAt =
+ rows.stream().map(r -> r.getTimestamp(4,
9)).min(Timestamp::compareTo).get();
+ Timestamp maxUpdatedAt =
+ rows.stream().map(r -> r.getTimestamp(4,
9)).max(Timestamp::compareTo).get();
+ ColumnStat updatedAtStat = getColumnStat(stats, "updated_at");
+ assertEquals(
+ Range.vector(minUpdatedAt.toMicros(), maxUpdatedAt.toMicros()),
updatedAtStat.getRange());
+ assertEquals(0, updatedAtStat.getNumNulls());
+
+ // Verify "is_active" stats (BOOLEAN)
+ ColumnStat isActiveStat = getColumnStat(stats, "is_active");
+ assertEquals(Range.vector(false, true), isActiveStat.getRange());
+ assertEquals(0, isActiveStat.getNumNulls());
+
+ // Verify "description" stats (VARCHAR(255))
+ String minDescription =
+ rows.stream().map(r ->
r.getString(6).toString()).min(String::compareTo).get();
+ String maxDescription =
+ rows.stream().map(r ->
r.getString(6).toString()).max(String::compareTo).get();
+ ColumnStat descriptionStat = getColumnStat(stats, "description");
+ assertEquals(Range.vector(minDescription, maxDescription),
descriptionStat.getRange());
+ assertEquals(0, descriptionStat.getNumNulls());
+ }
+
+ @Test
+ void testColumnStatsPartitionedTable() {
+ createPartitionedTable();
+ InternalSchema schema =
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
+
+ testTable.insertRows(10);
+
+ List<InternalDataFile> result =
+ extractor.toInternalDataFiles(
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
schema);
+
+ assertFalse(result.isEmpty());
+ InternalDataFile dataFile = result.get(0);
+ List<ColumnStat> stats = dataFile.getColumnStats();
+ assertFalse(stats.isEmpty());
+
+ // check that extracted stats' column orders are still the same
+ // no need to check stats range, these are covered in
testColumnStatsUnpartitioned
+ assertEquals("id", stats.get(0).getField().getName());
+ assertEquals("name", stats.get(1).getField().getName());
+ assertEquals("value", stats.get(2).getField().getName());
+ assertEquals("created_at", stats.get(3).getField().getName());
+ assertEquals("updated_at", stats.get(4).getField().getName());
+ assertEquals("is_active", stats.get(5).getField().getName());
+ assertEquals("description", stats.get(6).getField().getName());
+ assertEquals("level", stats.get(7).getField().getName());
+
+ // check stats range for the partition column (level)
+ assertEquals(Range.scalar(GenericTable.LEVEL_VALUES.get(0)),
stats.get(7).getRange());
+ }
+
+ @Test
+ void testTimestampPrecisionStats() {
+ Schema schema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column("ts_millis", DataTypes.TIMESTAMP(3))
+ .column("ts_micros", DataTypes.TIMESTAMP(6))
+ .column("ts_nanos", DataTypes.TIMESTAMP(9))
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1")
+ .build();
+
+ FileStoreTable table =
+ ((TestPaimonTable)
+ TestPaimonTable.createTable(
+ "ts_precision", null, tempDir, new Configuration(), false,
schema))
+ .getPaimonTable();
+
+ Timestamp millisOne =
+ Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 0,
123_000_000));
+ Timestamp microsOne =
+ Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 0,
123_456_000));
+ Timestamp nanosOne =
+ Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 0,
123_456_789));
+
+ Timestamp millisTwo =
+ Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 1,
987_000_000));
+ Timestamp microsTwo =
+ Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 1,
987_654_000));
+ Timestamp nanosTwo =
+ Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 1,
987_654_321));
+
+ TestPaimonTable.writeRows(
+ table,
+ Arrays.asList(
+ GenericRow.of(1, millisOne, microsOne, nanosOne),
+ GenericRow.of(2, millisTwo, microsTwo, nanosTwo)));
+
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(table.schema());
+ List<ColumnStat> stats =
+ extractor
+ .toInternalDataFiles(table,
table.snapshotManager().latestSnapshot(), internalSchema)
+ .get(0)
+ .getColumnStats();
+
+ Range millisRange = getColumnStat(stats, "ts_millis").getRange();
+ assertEquals(Range.vector(millisOne.getMillisecond(),
millisTwo.getMillisecond()), millisRange);
+
+ Range microsRange = getColumnStat(stats, "ts_micros").getRange();
+ assertEquals(Range.vector(microsOne.toMicros(), microsTwo.toMicros()),
microsRange);
+
+ Range nanosRange = getColumnStat(stats, "ts_nanos").getRange();
+ // TODO: Paimon does not fully support stats at nanos precision - this is
null for parquet
+ // format in 1.3.1
+ assertNull(nanosRange);
+ }
+
+ @Test
+ void testDecimalDateAndNullStatsWithLongStrings() {
+ Schema schema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column("price", DataTypes.DECIMAL(10, 2))
+ .column("event_date", DataTypes.DATE())
+ .column("notes", DataTypes.STRING())
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1")
+ .option("metadata.stats-mode", "truncate(16)")
+ .build();
+
+ FileStoreTable table =
+ ((TestPaimonTable)
+ TestPaimonTable.createTable(
+ "decimal_date", null, tempDir, new Configuration(), false,
schema))
+ .getPaimonTable();
+
+ String longA = repeatChar('a', 32);
+ String longB = repeatChar('b', 32);
+ String longC = repeatChar('c', 32);
+ int jan1 = (int) LocalDate.of(2024, 1, 1).toEpochDay();
+ int jan2 = (int) LocalDate.of(2024, 1, 2).toEpochDay();
+
+ GenericRow row1 =
+ GenericRow.of(
+ 1,
+ Decimal.fromBigDecimal(new BigDecimal("12.34"), 10, 2),
+ jan1,
+ BinaryString.fromString(longA));
+ GenericRow row2 = GenericRow.of(2, null, jan2,
BinaryString.fromString(longB));
+ GenericRow row3 =
+ GenericRow.of(
+ 3,
+ Decimal.fromBigDecimal(new BigDecimal("-5.50"), 10, 2),
+ null,
+ BinaryString.fromString(longC));
+
+ TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(table.schema());
+ List<ColumnStat> stats =
+ extractor
+ .toInternalDataFiles(table,
table.snapshotManager().latestSnapshot(), internalSchema)
+ .get(0)
+ .getColumnStats();
+
+ ColumnStat priceStat = getColumnStat(stats, "price");
+ assertEquals(
+ Range.vector(new BigDecimal("-5.50"), new BigDecimal("12.34")),
priceStat.getRange());
+ assertEquals(1, priceStat.getNumNulls());
+ assertEquals(3, priceStat.getNumValues());
+
+ ColumnStat dateStat = getColumnStat(stats, "event_date");
+ assertEquals(Range.vector(jan1, jan2), dateStat.getRange());
+ assertEquals(1, dateStat.getNumNulls());
+ assertEquals(3, dateStat.getNumValues());
+
+ ColumnStat notesStat = getColumnStat(stats, "notes");
+ String minNotes = (String) notesStat.getRange().getMinValue();
+ String maxNotes = (String) notesStat.getRange().getMaxValue();
+ assertEquals(repeatChar('a', 16), minNotes);
+ assertEquals(repeatChar('c', 15) + 'd', maxNotes);
+ assertEquals(16, minNotes.length());
+ assertEquals(16, maxNotes.length());
+ assertEquals(0, notesStat.getNumNulls());
+ assertEquals(3, notesStat.getNumValues());
+ }
+
+ @Test
+ void testFieldLevelStats() {
+ Schema schema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column("foo", DataTypes.STRING())
+ .column("bar", DataTypes.STRING())
+ .column("boo", DataTypes.STRING())
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1")
+ .option("metadata.stats-mode", "none")
+ .option("fields.id.stats-mode", "truncate(16)")
+ .option("fields.foo.stats-mode", "truncate(16)")
+ .build();
+
+ FileStoreTable table =
+ ((TestPaimonTable)
+ TestPaimonTable.createTable(
+ "field_level_stats", null, tempDir, new Configuration(),
false, schema))
+ .getPaimonTable();
+
+ GenericRow row1 =
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("foo1"),
+ BinaryString.fromString("bar1"),
+ BinaryString.fromString("boo1"));
+ GenericRow row2 =
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("foo2"),
+ BinaryString.fromString("bar2"),
+ BinaryString.fromString("boo2"));
+ GenericRow row3 =
+ GenericRow.of(
+ 3,
+ BinaryString.fromString("foo3"),
+ BinaryString.fromString("bar3"),
+ BinaryString.fromString("boo3"));
+
+ TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(table.schema());
+ List<ColumnStat> stats =
+ extractor
+ .toInternalDataFiles(table,
table.snapshotManager().latestSnapshot(), internalSchema)
+ .get(0)
+ .getColumnStats();
+
+ ColumnStat idStat = getColumnStat(stats, "id");
+ assertEquals(Range.vector(1, 3), idStat.getRange());
+ }
+
+ @Test
+ void testPaimonNoStats() {
+ Schema schema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column("foo", DataTypes.STRING())
+ .column("bar", DataTypes.STRING())
+ .column("boo", DataTypes.STRING())
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1")
+ .option("metadata.stats-mode", "none")
+ .build();
+
+ FileStoreTable table =
+ ((TestPaimonTable)
+ TestPaimonTable.createTable(
+ "field_level_stats", null, tempDir, new Configuration(),
false, schema))
+ .getPaimonTable();
+
+ GenericRow row1 =
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("foo1"),
+ BinaryString.fromString("bar1"),
+ BinaryString.fromString("boo1"));
+ GenericRow row2 =
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("foo2"),
+ BinaryString.fromString("bar2"),
+ BinaryString.fromString("boo2"));
+ GenericRow row3 =
+ GenericRow.of(
+ 3,
+ BinaryString.fromString("foo3"),
+ BinaryString.fromString("bar3"),
+ BinaryString.fromString("boo3"));
+
+ TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(table.schema());
+ List<ColumnStat> stats =
+ extractor
+ .toInternalDataFiles(table,
table.snapshotManager().latestSnapshot(), internalSchema)
+ .get(0)
+ .getColumnStats();
+
+ assertEquals(0, stats.size());
+ }
+
+ @Test
+ void testPaimonDropStats() {
+ Schema schema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column("foo", DataTypes.STRING())
+ .column("bar", DataTypes.STRING())
+ .column("boo", DataTypes.STRING())
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1")
+ .option("metadata.stats-mode", "full")
+ .option("manifest.delete-file-drop-stats", "true")
+ .build();
+
+ FileStoreTable table =
+ ((TestPaimonTable)
+ TestPaimonTable.createTable(
+ "field_level_stats", null, tempDir, new Configuration(),
false, schema))
+ .getPaimonTable();
+
+ GenericRow row1 =
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("foo1"),
+ BinaryString.fromString("bar1"),
+ BinaryString.fromString("boo1"));
+ GenericRow row2 =
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("foo2"),
+ BinaryString.fromString("bar2"),
+ BinaryString.fromString("boo2"));
+ GenericRow row3 =
+ GenericRow.of(
+ 3,
+ BinaryString.fromString("foo3"),
+ BinaryString.fromString("bar3"),
+ BinaryString.fromString("boo3"));
+
+ TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(table.schema());
+ List<ColumnStat> stats =
+ extractor
+ .toInternalDataFiles(table,
table.snapshotManager().latestSnapshot(), internalSchema)
+ .get(0)
+ .getColumnStats();
+
+ // compaction create commits that are DELETE and ADD on the same file
+ // with `manifest.delete-file-drop-stats` enabled, this means stats are
empty after compaction
+ // this is a smoke test to ensure exceptions aren't raised for this
scenario
+ // See also: https://github.com/apache/paimon/issues/7026
+ assertEquals(0, stats.size());
+ }
+
+ @Test
+ void testComplexFieldStats() {
+ // Paimon does not collect stats on nested fields or array fields
+ Schema schema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column(
+ "nested",
+ DataTypes.ROW(
+ DataTypes.FIELD(1, "f1", DataTypes.STRING()),
+ DataTypes.FIELD(2, "f2", DataTypes.INT())))
+ .column("array", DataTypes.ARRAY(DataTypes.INT()))
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1")
+ .option("metadata.stats-mode", "full")
+ .build();
+
+ FileStoreTable table =
+ ((TestPaimonTable)
+ TestPaimonTable.createTable(
+ "nested_field_stats", null, tempDir, new Configuration(),
false, schema))
+ .getPaimonTable();
+
+ GenericRow row1 =
+ GenericRow.of(
+ 1, GenericRow.of(BinaryString.fromString("a"), 10), new
GenericArray(new int[] {1, 2}));
+ GenericRow row2 =
+ GenericRow.of(
+ 2, GenericRow.of(BinaryString.fromString("b"), 20), new
GenericArray(new int[] {3, 4}));
+ GenericRow row3 =
+ GenericRow.of(
+ 3, GenericRow.of(BinaryString.fromString("c"), 30), new
GenericArray(new int[] {}));
+
+ TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(table.schema());
+ List<ColumnStat> stats =
+ extractor
+ .toInternalDataFiles(table,
table.snapshotManager().latestSnapshot(), internalSchema)
+ .get(0)
+ .getColumnStats();
+
+ // only the id column has stats, nested fields and array fields do not
have stats
+ assertEquals(1, stats.size());
+ ColumnStat idStat = getColumnStat(stats, "id");
+ assertEquals(Range.vector(1, 3), idStat.getRange());
+ assertEquals(0, idStat.getNumNulls());
+ assertEquals(3, idStat.getNumValues());
+ }
+
+ private void createUnpartitionedTable() {
+ testTable =
+ (TestPaimonTable)
+ TestPaimonTable.createTable("test_table", null, tempDir, new
Configuration(), false);
+ paimonTable = testTable.getPaimonTable();
+ }
+
+ private void createPartitionedTable() {
+ testTable =
+ (TestPaimonTable)
+ TestPaimonTable.createTable("test_table", "level", tempDir, new
Configuration(), false);
+ paimonTable = testTable.getPaimonTable();
+ }
+
+ private ColumnStat getColumnStat(List<ColumnStat> stats, String columnName) {
+ return stats.stream()
+ .filter(stat -> stat.getField().getName().equals(columnName))
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalArgumentException("Column stat not found for
column: " + columnName));
+ }
+
+ private String repeatChar(char ch, int count) {
+ char[] chars = new char[count];
+ Arrays.fill(chars, ch);
+ return new String(chars);
+ }
+}
diff --git
a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
index e9217a33..e646507b 100644
--- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
+++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
@@ -38,4 +38,6 @@ tableFormatConverters:
spark.app.name: xtable
ICEBERG:
conversionSourceProviderClass:
org.apache.xtable.iceberg.IcebergConversionSourceProvider
- conversionTargetProviderClass:
org.apache.xtable.iceberg.IcebergConversionTarget
\ No newline at end of file
+ conversionTargetProviderClass:
org.apache.xtable.iceberg.IcebergConversionTarget
+ PAIMON:
+ conversionSourceProviderClass:
org.apache.xtable.paimon.PaimonConversionSourceProvider
diff --git
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
index 273854a0..a5967442 100644
---
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
+++
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
@@ -21,6 +21,7 @@ package org.apache.xtable.utilities;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
import java.io.IOException;
import java.net.URL;
@@ -104,10 +105,11 @@ class TestRunSync {
public void testTableFormatConverterConfigDefault() throws IOException {
TableFormatConverters converters =
RunSync.loadTableFormatConversionConfigs(null);
Map<String, ConversionConfig> tfConverters =
converters.getTableFormatConverters();
- Assertions.assertEquals(3, tfConverters.size());
+ Assertions.assertEquals(4, tfConverters.size());
Assertions.assertNotNull(tfConverters.get(DELTA));
Assertions.assertNotNull(tfConverters.get(HUDI));
Assertions.assertNotNull(tfConverters.get(ICEBERG));
+ Assertions.assertNotNull(tfConverters.get(PAIMON));
Assertions.assertEquals(
"org.apache.xtable.hudi.HudiConversionSourceProvider",
@@ -127,6 +129,9 @@ class TestRunSync {
Assertions.assertEquals(
"org.apache.xtable.delta.DeltaConversionSourceProvider",
tfConverters.get(DELTA).getConversionSourceProviderClass());
+ Assertions.assertEquals(
+ "org.apache.xtable.paimon.PaimonConversionSourceProvider",
+ tfConverters.get(PAIMON).getConversionSourceProviderClass());
}
@Test
@@ -144,7 +149,7 @@ class TestRunSync {
TableFormatConverters converters =
RunSync.loadTableFormatConversionConfigs(customConverters.getBytes());
Map<String, ConversionConfig> tfConverters =
converters.getTableFormatConverters();
- Assertions.assertEquals(4, tfConverters.size());
+ Assertions.assertEquals(5, tfConverters.size());
Assertions.assertNotNull(tfConverters.get("NEW_FORMAT"));
Assertions.assertEquals(