the-other-tim-brown commented on code in PR #760:
URL: https://github.com/apache/incubator-xtable/pull/760#discussion_r2536030415
##########
xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java:
##########
@@ -84,6 +84,9 @@ private InternalSchema toInternalSchema(
int openParenIndex = typeName.indexOf("(");
String trimmedTypeName = openParenIndex > 0 ? typeName.substring(0,
openParenIndex) : typeName;
switch (trimmedTypeName) {
+ case "short":
Review Comment:
Can you update the unit tests to cover this case?
##########
xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java:
##########
@@ -229,6 +286,114 @@ public FileStats getColumnStatsForFile(AddFile addFile,
List<InternalField> fiel
}
}
+ /**
+ * Reads column statistics directly from a Parquet file footer. This method
is used as a fallback
+ * when Delta checkpoint statistics are NULL or unavailable.
+ *
+ * <p>This operation is expensive as it requires:
+ *
+ * <ul>
+ * <li>Opening each Parquet file individually (I/O overhead)
+ * <li>Reading the file footer metadata
+ * <li>Parsing column chunk metadata for all columns
+ * <li>Converting Parquet statistics to internal format
+ * </ul>
+ *
+ * <p>For cloud storage (S3, GCS, ADLS), this can add significant latency
due to network overhead.
+ * The method performs several safety checks to prevent errors:
+ *
+ * <ul>
+ * <li>Filters out statistics with NULL min/max ranges (prevents
NullPointerException)
+ * <li>Skips DECIMAL and complex types (prevents ClassCastException)
+ * <li>Validates Binary-to-primitive type conversions
+ * </ul>
+ *
+ * <p>Record Count: The record count is read from Parquet row group
metadata, which is always
+ * reliable regardless of column statistics availability.
+ *
+ * @param addFile the Delta AddFile action containing the file path
+ * @param snapshot the Delta snapshot providing table base path
+ * @param fields the schema fields for which to extract statistics
+ * @return FileStats with extracted statistics, or empty stats if reading
fails
+ */
+ private FileStats readStatsFromParquetFooter(
+ AddFile addFile, Snapshot snapshot, List<InternalField> fields) {
+ try {
+ // Construct absolute path to the Parquet data file
+ // Handle both absolute paths and relative paths from table base
+ String tableBasePath = snapshot.deltaLog().dataPath().toString();
+ String filePath = addFile.path();
+ String fullPath =
+ filePath.startsWith(tableBasePath) ? filePath : tableBasePath + "/"
+ filePath;
+
+ // Read Parquet file footer metadata using Hadoop FileSystem API
+ Configuration conf = new Configuration();
+ Path parquetPath = new Path(fullPath);
+
+ ParquetMetadata footer =
ParquetMetadataExtractor.readParquetMetadata(conf, parquetPath);
+ List<ColumnStat> parquetStats =
ParquetStatsExtractor.getColumnStatsForaFile(footer);
+
+ // Extract record count from Parquet row groups metadata
+ // This is always reliable and doesn't depend on column statistics
+ long numRecords = footer.getBlocks().stream().mapToLong(block ->
block.getRowCount()).sum();
+
+ // Build lookup map for efficient field matching by path
+ Map<String, ColumnStat> pathToStat =
+ parquetStats.stream()
+ .collect(
+ Collectors.toMap(
+ stat -> stat.getField().getPath(),
+ Function.identity(),
+ (stat1, stat2) -> stat1)); // Keep first occurrence on
collision
+
+ // Map Parquet stats to requested Delta schema fields
+ // Filter out statistics with NULL ranges to prevent downstream
NullPointerException
+ List<ColumnStat> mappedStats =
+ fields.stream()
+ .filter(field -> pathToStat.containsKey(field.getPath()))
+ .map(
+ field -> {
+ ColumnStat parquetStat = pathToStat.get(field.getPath());
+ // Rebuild ColumnStat with correct Delta field reference
+ // while preserving Parquet statistics values
+ return ColumnStat.builder()
+ .field(field)
+ .numValues(parquetStat.getNumValues())
+ .numNulls(parquetStat.getNumNulls())
+ .totalSize(parquetStat.getTotalSize())
+ .range(parquetStat.getRange())
+ .build();
+ })
+ .filter(
+ stat ->
+ stat.getRange() != null
+ && stat.getRange().getMinValue() != null
+ && stat.getRange().getMaxValue() != null)
+ .collect(Collectors.toList());
+
+ log.debug(
+ "Successfully extracted {} column stats from Parquet footer for
file: {}",
+ mappedStats.size(),
+ addFile.path());
+
+ return
FileStats.builder().columnStats(mappedStats).numRecords(numRecords).build();
+
+ } catch (Exception e) {
+ // Log warning but continue conversion - the file will be added without
statistics
+ // This is preferable to failing the entire conversion
+ log.warn(
+ "Failed to read stats from Parquet footer for file {}: {}. "
+ + "File will be included without column statistics.",
+ addFile.path(),
+ e.getMessage());
Review Comment:
```suggestion
"Failed to read stats from Parquet footer for file {}. "
+ "File will be included without column statistics.",
addFile.path(),
e);
```
This will log out the full exception stacktrace to provide more details on
the failure which makes it easier to debug.
##########
xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java:
##########
@@ -229,6 +286,114 @@ public FileStats getColumnStatsForFile(AddFile addFile,
List<InternalField> fiel
}
}
+ /**
+ * Reads column statistics directly from a Parquet file footer. This method
is used as a fallback
+ * when Delta checkpoint statistics are NULL or unavailable.
+ *
+ * <p>This operation is expensive as it requires:
+ *
+ * <ul>
+ * <li>Opening each Parquet file individually (I/O overhead)
+ * <li>Reading the file footer metadata
+ * <li>Parsing column chunk metadata for all columns
+ * <li>Converting Parquet statistics to internal format
+ * </ul>
+ *
+ * <p>For cloud storage (S3, GCS, ADLS), this can add significant latency
due to network overhead.
+ * The method performs several safety checks to prevent errors:
+ *
+ * <ul>
+ * <li>Filters out statistics with NULL min/max ranges (prevents
NullPointerException)
+ * <li>Skips DECIMAL and complex types (prevents ClassCastException)
+ * <li>Validates Binary-to-primitive type conversions
+ * </ul>
+ *
+ * <p>Record Count: The record count is read from Parquet row group
metadata, which is always
+ * reliable regardless of column statistics availability.
+ *
+ * @param addFile the Delta AddFile action containing the file path
+ * @param snapshot the Delta snapshot providing table base path
+ * @param fields the schema fields for which to extract statistics
+ * @return FileStats with extracted statistics, or empty stats if reading
fails
+ */
+ private FileStats readStatsFromParquetFooter(
Review Comment:
Let's also make sure there is a test case to cover this flow. It can be a
basic one, the detailed coverage for translating from parquet stats to our
internal stats representation will be covered in
https://github.com/apache/incubator-xtable/issues/748
##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java:
##########
@@ -107,13 +153,95 @@ private void applyDiff(
private DataFile getDataFile(
PartitionSpec partitionSpec, Schema schema, InternalDataFile dataFile) {
+ // Convert Iceberg schema to InternalSchema for stats extraction
+ InternalSchema internalSchema =
IcebergSchemaExtractor.getInstance().fromIceberg(schema);
+
+ // Get existing stats and check if they are complete
+ List<ColumnStat> existingStats = dataFile.getColumnStats();
+ long recordCount = dataFile.getRecordCount();
+ List<ColumnStat> columnStats;
+
+ // For Parquet files, ALWAYS read from footer to match native Iceberg
behavior
Review Comment:
This is avoided as it introduces overheads and the source stats are
typically from the files themselves. Iceberg writers actually generate these
stats while writing instead of reading from the footer to avoid this same
overhead.
##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java:
##########
@@ -68,24 +89,49 @@ public void applySnapshot(
}
public void applyDiff(
+ Table table,
Transaction transaction,
InternalFilesDiff internalFilesDiff,
Schema schema,
PartitionSpec partitionSpec,
TableSyncMetadata metadata) {
+ // Get existing files in Iceberg to filter out duplicates
+ // This handles cases where source (e.g., Delta with blind append OPTIMIZE)
+ // sends AddFile actions for files that are already synced
+ // Note: Must scan the base table (not transaction.table()) as transaction
tables don't support
+ // scans
+ Map<String, DataFile> existingFiles = new HashMap<>();
+
+ // Optimize: Check if table has a snapshot before scanning
+ // For empty tables, this avoids expensive manifest file reads from cloud
storage (GCS, S3, etc.)
+ Snapshot currentSnapshot = table.currentSnapshot();
+ if (currentSnapshot != null) {
+ try (CloseableIterable<FileScanTask> iterator =
table.newScan().planFiles()) {
+ StreamSupport.stream(iterator.spliterator(), false)
+ .map(FileScanTask::file)
+ .forEach(file -> existingFiles.put(file.path().toString(), file));
+ } catch (Exception e) {
+ throw new ReadException("Failed to read existing Iceberg files during
incremental sync", e);
+ }
+ } else {
+ log.debug("Table has no snapshot, skipping file scan (table is empty)");
+ }
+
+ // Filter out files that already exist in Iceberg
Review Comment:
This is not required, the `applyDiff` is only for incremental sync. It
assumes that the table is already synced once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]