This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 85220ece Handle empty Iceberg table source in conversion (#756)
85220ece is described below
commit 85220ece8ce87194d54f02f66c3dc6e8ac608b00
Author: Rishi Reddy Bokka <[email protected]>
AuthorDate: Wed Nov 19 14:52:51 2025 -0800
Handle empty Iceberg table source in conversion (#756)
* Handle empty Iceberg table source in conversion
* applied spotless
---
.../xtable/iceberg/IcebergConversionSource.java | 24 +++++++-
.../iceberg/TestIcebergConversionSource.java | 70 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 2 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 7a777ddb..325b50e9 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -108,7 +108,8 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
@Override
public InternalTable getTable(Snapshot snapshot) {
Table iceTable = getSourceTable();
- Schema iceSchema = iceTable.schemas().get(snapshot.schemaId());
+ Schema iceSchema =
+ (snapshot != null) ? iceTable.schemas().get(snapshot.schemaId()) :
iceTable.schema();
TableOperations iceOps = ((BaseTable) iceTable).operations();
IcebergSchemaExtractor schemaExtractor =
IcebergSchemaExtractor.getInstance();
InternalSchema irSchema = schemaExtractor.fromIceberg(iceSchema);
@@ -123,12 +124,19 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
irPartitionFields.size() > 0
? DataLayoutStrategy.HIVE_STYLE_PARTITION
: DataLayoutStrategy.FLAT;
+
+ Instant latestCommitTime =
+ (snapshot != null)
+ ? Instant.ofEpochMilli(snapshot.timestampMillis())
+ : Instant.ofEpochMilli(
+ ((BaseTable)
iceTable).operations().current().lastUpdatedMillis());
+
return InternalTable.builder()
.tableFormat(TableFormat.ICEBERG)
.basePath(iceTable.location())
.name(iceTable.name())
.partitioningFields(irPartitionFields)
- .latestCommitTime(Instant.ofEpochMilli(snapshot.timestampMillis()))
+ .latestCommitTime(latestCommitTime)
.readSchema(irSchema)
.layoutStrategy(dataLayoutStrategy)
.latestMetadataPath(iceOps.current().metadataFileLocation())
@@ -147,6 +155,18 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
Table iceTable = getSourceTable();
Snapshot currentSnapshot = iceTable.currentSnapshot();
+
+ if (currentSnapshot == null) {
+ // Handle empty table case - return snapshot with schema but no data
files
+ InternalTable irTable = getTable(null);
+ return InternalSnapshot.builder()
+ .version("0")
+ .table(irTable)
+ .partitionedDataFiles(Collections.emptyList())
+ .sourceIdentifier("0")
+ .build();
+ }
+
InternalTable irTable = getTable(currentSnapshot);
TableScan scan =
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
index ffe6a217..d1bf70bb 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
@@ -116,6 +116,37 @@ class TestIcebergConversionSource {
internalTable.getPartitioningFields().get(0).getTransformType());
}
+ @Test
+ void testGetTableWithoutSnapshot(@TempDir Path workingDir) throws
IOException {
+ Table emptyTable = createTestCatalogTable(workingDir.toString());
+ assertNull(emptyTable.currentSnapshot());
+
+ SourceTable sourceTableConfig = getPerTableConfig(emptyTable);
+
+ IcebergConversionSource conversionSource =
+ sourceProvider.getConversionSourceInstance(sourceTableConfig);
+
+ InternalTable internalTable = conversionSource.getTable(null);
+ assertNotNull(internalTable);
+ assertEquals(TableFormat.ICEBERG, internalTable.getTableFormat());
+ assertEquals(emptyTable.location(), internalTable.getBasePath());
+ assertEquals(
+ ((BaseTable) emptyTable).operations().current().lastUpdatedMillis(),
+ internalTable.getLatestCommitTime().toEpochMilli());
+
+ assertEquals(
+ emptyTable.schema().columns().size(),
internalTable.getReadSchema().getFields().size());
+ validateSchema(internalTable.getReadSchema(), emptyTable.schema());
+
+ assertEquals(1, internalTable.getPartitioningFields().size());
+ InternalField partitionField =
internalTable.getPartitioningFields().get(0).getSourceField();
+ assertEquals("cs_sold_date_sk", partitionField.getName());
+ assertEquals(7, partitionField.getFieldId());
+ assertEquals(
+ PartitionTransformType.VALUE,
+ internalTable.getPartitioningFields().get(0).getTransformType());
+ }
+
@Test
public void testGetCurrentSnapshot(@TempDir Path workingDir) throws
IOException {
Table catalogSales = createTestTableWithData(workingDir.toString());
@@ -164,6 +195,45 @@ class TestIcebergConversionSource {
}
}
+ @Test
+ void testGetCurrentSnapshotForEmptyTable(@TempDir Path workingDir) throws
IOException {
+ Table emptyTable = createTestCatalogTable(workingDir.toString());
+ assertNull(emptyTable.currentSnapshot());
+
+ SourceTable sourceTableConfig = getPerTableConfig(emptyTable);
+
+ IcebergDataFileExtractor spyDataFileExtractor =
spy(IcebergDataFileExtractor.builder().build());
+ IcebergPartitionValueConverter spyPartitionConverter =
+ spy(IcebergPartitionValueConverter.getInstance());
+
+ IcebergConversionSource conversionSource =
+ IcebergConversionSource.builder()
+ .hadoopConf(hadoopConf)
+ .sourceTableConfig(sourceTableConfig)
+ .dataFileExtractor(spyDataFileExtractor)
+ .partitionConverter(spyPartitionConverter)
+ .build();
+
+ InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
+ assertNotNull(internalSnapshot);
+ assertEquals("0", internalSnapshot.getVersion());
+ assertEquals("0", internalSnapshot.getSourceIdentifier());
+ assertTrue(internalSnapshot.getPartitionedDataFiles().isEmpty());
+
+ InternalTable internalTable = internalSnapshot.getTable();
+ assertNotNull(internalTable);
+ assertEquals(emptyTable.location(), internalTable.getBasePath());
+ assertEquals(
+ ((BaseTable) emptyTable).operations().current().lastUpdatedMillis(),
+ internalTable.getLatestCommitTime().toEpochMilli());
+
+ assertEquals(
+ emptyTable.schema().columns().size(),
internalTable.getReadSchema().getFields().size());
+
+ verify(spyPartitionConverter, never()).toXTable(any(), any(), any());
+ verify(spyDataFileExtractor, never()).fromIceberg(any(), any(), any());
+ }
+
@Test
public void testGetTableChangeForCommit(@TempDir Path workingDir) throws
IOException {
Table catalogSales = createTestTableWithData(workingDir.toString());