This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 85220ece Handle empty Iceberg table source in conversion (#756)
85220ece is described below

commit 85220ece8ce87194d54f02f66c3dc6e8ac608b00
Author: Rishi Reddy Bokka <[email protected]>
AuthorDate: Wed Nov 19 14:52:51 2025 -0800

    Handle empty Iceberg table source in conversion (#756)
    
    * Handle empty Iceberg table source in conversion
    
    * applied spotless
---
 .../xtable/iceberg/IcebergConversionSource.java    | 24 +++++++-
 .../iceberg/TestIcebergConversionSource.java       | 70 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 2 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 7a777ddb..325b50e9 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -108,7 +108,8 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
   @Override
   public InternalTable getTable(Snapshot snapshot) {
     Table iceTable = getSourceTable();
-    Schema iceSchema = iceTable.schemas().get(snapshot.schemaId());
+    Schema iceSchema =
+        (snapshot != null) ? iceTable.schemas().get(snapshot.schemaId()) : 
iceTable.schema();
     TableOperations iceOps = ((BaseTable) iceTable).operations();
     IcebergSchemaExtractor schemaExtractor = 
IcebergSchemaExtractor.getInstance();
     InternalSchema irSchema = schemaExtractor.fromIceberg(iceSchema);
@@ -123,12 +124,19 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
         irPartitionFields.size() > 0
             ? DataLayoutStrategy.HIVE_STYLE_PARTITION
             : DataLayoutStrategy.FLAT;
+
+    Instant latestCommitTime =
+        (snapshot != null)
+            ? Instant.ofEpochMilli(snapshot.timestampMillis())
+            : Instant.ofEpochMilli(
+                ((BaseTable) 
iceTable).operations().current().lastUpdatedMillis());
+
     return InternalTable.builder()
         .tableFormat(TableFormat.ICEBERG)
         .basePath(iceTable.location())
         .name(iceTable.name())
         .partitioningFields(irPartitionFields)
-        .latestCommitTime(Instant.ofEpochMilli(snapshot.timestampMillis()))
+        .latestCommitTime(latestCommitTime)
         .readSchema(irSchema)
         .layoutStrategy(dataLayoutStrategy)
         .latestMetadataPath(iceOps.current().metadataFileLocation())
@@ -147,6 +155,18 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
     Table iceTable = getSourceTable();
 
     Snapshot currentSnapshot = iceTable.currentSnapshot();
+
+    if (currentSnapshot == null) {
+      // Handle empty table case - return snapshot with schema but no data 
files
+      InternalTable irTable = getTable(null);
+      return InternalSnapshot.builder()
+          .version("0")
+          .table(irTable)
+          .partitionedDataFiles(Collections.emptyList())
+          .sourceIdentifier("0")
+          .build();
+    }
+
     InternalTable irTable = getTable(currentSnapshot);
 
     TableScan scan =
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
index ffe6a217..d1bf70bb 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
@@ -116,6 +116,37 @@ class TestIcebergConversionSource {
         internalTable.getPartitioningFields().get(0).getTransformType());
   }
 
+  @Test
+  void testGetTableWithoutSnapshot(@TempDir Path workingDir) throws 
IOException {
+    Table emptyTable = createTestCatalogTable(workingDir.toString());
+    assertNull(emptyTable.currentSnapshot());
+
+    SourceTable sourceTableConfig = getPerTableConfig(emptyTable);
+
+    IcebergConversionSource conversionSource =
+        sourceProvider.getConversionSourceInstance(sourceTableConfig);
+
+    InternalTable internalTable = conversionSource.getTable(null);
+    assertNotNull(internalTable);
+    assertEquals(TableFormat.ICEBERG, internalTable.getTableFormat());
+    assertEquals(emptyTable.location(), internalTable.getBasePath());
+    assertEquals(
+        ((BaseTable) emptyTable).operations().current().lastUpdatedMillis(),
+        internalTable.getLatestCommitTime().toEpochMilli());
+
+    assertEquals(
+        emptyTable.schema().columns().size(), 
internalTable.getReadSchema().getFields().size());
+    validateSchema(internalTable.getReadSchema(), emptyTable.schema());
+
+    assertEquals(1, internalTable.getPartitioningFields().size());
+    InternalField partitionField = 
internalTable.getPartitioningFields().get(0).getSourceField();
+    assertEquals("cs_sold_date_sk", partitionField.getName());
+    assertEquals(7, partitionField.getFieldId());
+    assertEquals(
+        PartitionTransformType.VALUE,
+        internalTable.getPartitioningFields().get(0).getTransformType());
+  }
+
   @Test
   public void testGetCurrentSnapshot(@TempDir Path workingDir) throws 
IOException {
     Table catalogSales = createTestTableWithData(workingDir.toString());
@@ -164,6 +195,45 @@ class TestIcebergConversionSource {
     }
   }
 
+  @Test
+  void testGetCurrentSnapshotForEmptyTable(@TempDir Path workingDir) throws 
IOException {
+    Table emptyTable = createTestCatalogTable(workingDir.toString());
+    assertNull(emptyTable.currentSnapshot());
+
+    SourceTable sourceTableConfig = getPerTableConfig(emptyTable);
+
+    IcebergDataFileExtractor spyDataFileExtractor = 
spy(IcebergDataFileExtractor.builder().build());
+    IcebergPartitionValueConverter spyPartitionConverter =
+        spy(IcebergPartitionValueConverter.getInstance());
+
+    IcebergConversionSource conversionSource =
+        IcebergConversionSource.builder()
+            .hadoopConf(hadoopConf)
+            .sourceTableConfig(sourceTableConfig)
+            .dataFileExtractor(spyDataFileExtractor)
+            .partitionConverter(spyPartitionConverter)
+            .build();
+
+    InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
+    assertNotNull(internalSnapshot);
+    assertEquals("0", internalSnapshot.getVersion());
+    assertEquals("0", internalSnapshot.getSourceIdentifier());
+    assertTrue(internalSnapshot.getPartitionedDataFiles().isEmpty());
+
+    InternalTable internalTable = internalSnapshot.getTable();
+    assertNotNull(internalTable);
+    assertEquals(emptyTable.location(), internalTable.getBasePath());
+    assertEquals(
+        ((BaseTable) emptyTable).operations().current().lastUpdatedMillis(),
+        internalTable.getLatestCommitTime().toEpochMilli());
+
+    assertEquals(
+        emptyTable.schema().columns().size(), 
internalTable.getReadSchema().getFields().size());
+
+    verify(spyPartitionConverter, never()).toXTable(any(), any(), any());
+    verify(spyDataFileExtractor, never()).fromIceberg(any(), any(), any());
+  }
+
   @Test
   public void testGetTableChangeForCommit(@TempDir Path workingDir) throws 
IOException {
     Table catalogSales = createTestTableWithData(workingDir.toString());

Reply via email to