This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 867835e282 Core: Use time-travel schema when resolving partition spec 
in scan (#13301)
867835e282 is described below

commit 867835e2829f5ee5ecbd84ae8098d484cf9eed0e
Author: chenjian2664 <[email protected]>
AuthorDate: Mon Oct 27 21:39:14 2025 +0800

    Core: Use time-travel schema when resolving partition spec in scan (#13301)
---
 .../apache/iceberg/BaseDistributedDataScan.java    |   2 +-
 .../src/main/java/org/apache/iceberg/DataScan.java |   2 +-
 .../java/org/apache/iceberg/DataTableScan.java     |   2 +-
 .../main/java/org/apache/iceberg/SnapshotScan.java |  22 ++++
 .../iceberg/TestScansAndSchemaEvolution.java       | 142 ++++++++++++++++++++-
 5 files changed, 163 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java 
b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
index 89c7f0b606..c69f71f2fd 100644
--- a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
@@ -300,7 +300,7 @@ abstract class BaseDistributedDataScan
     }
 
     return builder
-        .specsById(table().specs())
+        .specsById(specs())
         .filterData(filter())
         .caseSensitive(isCaseSensitive())
         .scanMetrics(scanMetrics())
diff --git a/core/src/main/java/org/apache/iceberg/DataScan.java 
b/core/src/main/java/org/apache/iceberg/DataScan.java
index 1c48042f52..1acbbbf682 100644
--- a/core/src/main/java/org/apache/iceberg/DataScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataScan.java
@@ -53,7 +53,7 @@ abstract class DataScan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>>
             .caseSensitive(isCaseSensitive())
             .select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
             .filterData(filter())
-            .specsById(table().specs())
+            .specsById(specs())
             .scanMetrics(scanMetrics())
             .ignoreDeleted()
             .columnsToKeepStats(columnsToKeepStats());
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java 
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 8463112b7a..4d23dd525e 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -74,7 +74,7 @@ public class DataTableScan extends BaseTableScan {
             .caseSensitive(isCaseSensitive())
             .select(scanColumns())
             .filterData(filter())
-            .specsById(table().specs())
+            .specsById(specs())
             .scanMetrics(scanMetrics())
             .ignoreDeleted()
             .columnsToKeepStats(columnsToKeepStats());
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java 
b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
index a98a8c9f13..8a836b634e 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.metrics.Timer;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.TypeUtil;
@@ -79,6 +80,27 @@ public abstract class SnapshotScan<ThisT, T extends 
ScanTask, G extends ScanTask
     return scanMetrics;
   }
 
+  protected Map<Integer, PartitionSpec> specs() {
+    Map<Integer, PartitionSpec> specs = table().specs();
+    // requires latest schema
+    if (!useSnapshotSchema()
+        || snapshotId() == null
+        || table().currentSnapshot() == null
+        || snapshotId().equals(table().currentSnapshot().snapshotId())) {
+      return specs;
+    }
+
+    // this is a time travel request
+    Schema snapshotSchema = tableSchema();
+    ImmutableMap.Builder<Integer, PartitionSpec> newSpecs =
+        ImmutableMap.builderWithExpectedSize(specs.size());
+    for (Map.Entry<Integer, PartitionSpec> entry : specs.entrySet()) {
+      newSpecs.put(entry.getKey(), 
entry.getValue().toUnbound().bind(snapshotSchema));
+    }
+
+    return newSpecs.build();
+  }
+
   public ThisT useSnapshot(long scanSnapshotId) {
     Preconditions.checkArgument(
         snapshotId() == null, "Cannot override snapshot, already set snapshot 
id=%s", snapshotId());
diff --git 
a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java 
b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
index 790f7c7152..3df370fe6f 100644
--- a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
+++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.RandomAvroData;
@@ -65,21 +66,26 @@ public class TestScansAndSchemaEvolution {
   @TempDir private File temp;
 
   private DataFile createDataFile(String partValue) throws IOException {
-    List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100, 
0L);
+    return createDataFile(partValue, SCHEMA, SPEC);
+  }
+
+  private DataFile createDataFile(String partValue, Schema schema, 
PartitionSpec spec)
+      throws IOException {
+    List<GenericData.Record> expected = RandomAvroData.generate(schema, 100, 
0L);
 
     OutputFile dataFile =
         new 
InMemoryOutputFile(FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
     try (FileAppender<GenericData.Record> writer =
-        Avro.write(dataFile).schema(SCHEMA).named("test").build()) {
+        Avro.write(dataFile).schema(schema).named("test").build()) {
       for (GenericData.Record rec : expected) {
         rec.put("part", partValue); // create just one partition
         writer.add(rec);
       }
     }
 
-    PartitionData partition = new PartitionData(SPEC.partitionType());
+    PartitionData partition = new PartitionData(spec.partitionType());
     partition.set(0, partValue);
-    return DataFiles.builder(SPEC)
+    return DataFiles.builder(spec)
         .withInputFile(dataFile.toInputFile())
         .withPartition(partition)
         .withRecordCount(100)
@@ -99,6 +105,7 @@ public class TestScansAndSchemaEvolution {
     DataFile fileTwo = createDataFile("two");
 
     table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+    long firstSnapshotId = table.currentSnapshot().snapshotId();
 
     List<FileScanTask> tasks =
         Lists.newArrayList(table.newScan().filter(Expressions.equal("part", 
"one")).planFiles());
@@ -111,6 +118,133 @@ public class TestScansAndSchemaEvolution {
     tasks = Lists.newArrayList(table.newScan().filter(Expressions.equal("p", 
"one")).planFiles());
 
     assertThat(tasks).hasSize(1);
+
+    // create a new commit
+    table.newAppend().appendFile(createDataFile("three")).commit();
+
+    // use fiter with previous partition name
+    tasks =
+        Lists.newArrayList(
+            table
+                .newScan()
+                .useSnapshot(firstSnapshotId)
+                .filter(Expressions.equal("part", "one"))
+                .planFiles());
+
+    assertThat(tasks).hasSize(1);
+  }
+
+  @TestTemplate
+  public void testPartitionSourceDrop() throws IOException {
+    Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);
+
+    DataFile fileOne = createDataFile("one");
+    DataFile fileTwo = createDataFile("two");
+
+    table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+    long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.updateSpec().addField("id").commit();
+
+    List<FileScanTask> tasks =
+        Lists.newArrayList(
+            
table.newScan().filter(Expressions.not(Expressions.isNull("id"))).planFiles());
+
+    assertThat(tasks).hasSize(2);
+
+    DataFile fileThree = createDataFile("three", table.schema(), table.spec());
+    table.newAppend().appendFile(fileThree).commit();
+
+    // remove one field from spec and drop the column
+    table.updateSpec().removeField("id").commit();
+    table.updateSchema().deleteColumn("id").commit();
+
+    List<FileScanTask> tasksAtFirstSnapshotId =
+        Lists.newArrayList(
+            table
+                .newScan()
+                .useSnapshot(firstSnapshotId)
+                .filter(Expressions.not(Expressions.isNull("id")))
+                .planFiles());
+
+    assertThat(
+            tasksAtFirstSnapshotId.stream()
+                .map(ContentScanTask::file)
+                .map(ContentFile::location)
+                .collect(Collectors.toList()))
+        .isEqualTo(
+            tasks.stream()
+                .map(ContentScanTask::file)
+                .map(ContentFile::location)
+                .collect(Collectors.toList()));
+  }
+
+  @TestTemplate
+  public void testColumnRename() throws IOException {
+    Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);
+
+    DataFile fileOne = createDataFile("one");
+    DataFile fileTwo = createDataFile("two");
+
+    table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+    long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.updateSchema().renameColumn("data", "renamed_data").commit();
+
+    DataFile fileThree = createDataFile("three", table.schema(), table.spec());
+    table.newAppend().appendFile(fileThree).commit();
+    long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+    // generate a new commit
+    DataFile fileFour = createDataFile("four", table.schema(), table.spec());
+    table.newAppend().appendFile(fileFour).commit();
+
+    // running successfully with the new filter on previous column name
+    List<FileScanTask> tasks =
+        Lists.newArrayList(
+            table
+                .newScan()
+                .useSnapshot(firstSnapshotId)
+                .filter(Expressions.equal("data", "xyz"))
+                .planFiles());
+    assertThat(tasks).hasSize(2);
+
+    // running successfully with the new filter on renamed column name
+    tasks =
+        Lists.newArrayList(
+            table
+                .newScan()
+                .useSnapshot(secondSnapshotId)
+                .filter(Expressions.equal("renamed_data", "xyz"))
+                .planFiles());
+    assertThat(tasks).hasSize(3);
+  }
+
+  @TestTemplate
+  public void testColumnDrop() throws IOException {
+    Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);
+
+    DataFile fileOne = createDataFile("one");
+    DataFile fileTwo = createDataFile("two");
+
+    table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+    long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.updateSchema().deleteColumn("data").commit();
+
+    // make sure generating a new commit after dropping a column
+    DataFile fileThree = createDataFile("three", table.schema(), table.spec());
+    table.newAppend().appendFile(fileThree).commit();
+
+    // running successfully with the new filter on previous column name
+    List<FileScanTask> tasks =
+        Lists.newArrayList(
+            table
+                .newScan()
+                .useSnapshot(firstSnapshotId)
+                .filter(Expressions.equal("data", "xyz"))
+                .planFiles());
+    assertThat(tasks).hasSize(2);
   }
 
   @TestTemplate

Reply via email to