wypoon commented on a change in pull request #1508:
URL: https://github.com/apache/iceberg/pull/1508#discussion_r501457348



##########
File path: api/src/main/java/org/apache/iceberg/Table.java
##########
@@ -51,6 +51,23 @@
    */
   Schema schema();
 
+  /**
+   * Return the {@link Schema schema} for this table at the time of the 
snapshot
+   * specified by the snapshotId.
+   *
+   * @return this table's schema
+   */
+  Schema schemaForSnapshot(long snapshotId);
+
+  /**
+   * Return the {@link Schema schema} for this table at the time of the most 
recent
+   * snapshot as of the specified timestampMillis.
+   * Note: This is not necessarily the schema at the time of the specified 
timestampMillis.
+   *
+   * @return this table's schema
+   */
+  Schema schemaForSnapshotAsOfTime(long timestampMillis);

Review comment:
       I have removed the methods from `Table`, adding them only to 
`BaseTable`. Thus the `Table` API is unchanged.

##########
File path: core/src/main/java/org/apache/iceberg/BaseTable.java
##########
@@ -59,6 +60,45 @@ public Schema schema() {
     return ops.current().schema();
   }
 
+  @Override
+  public Schema schemaForSnapshot(long snapshotId) {
+    TableMetadata current = ops.current();
+    if (current.currentSnapshot() != null &&
+        current.currentSnapshot().snapshotId() == snapshotId) {
+      return current.schema();
+    }
+    Long snapshotTs = null;
+    for (HistoryEntry logEntry : current.snapshotLog()) {
+      if (logEntry.snapshotId() == snapshotId) {
+        snapshotTs = logEntry.timestampMillis();
+      }
+    }
+    Preconditions.checkArgument(snapshotTs != null,
+        "Cannot find a snapshot with id %s", snapshotId);
+    TableMetadata.MetadataLogEntry metadataLogEntry = null;
+    for (TableMetadata.MetadataLogEntry logEntry : current.previousFiles()) {
+      if (logEntry.timestampMillis() <= snapshotTs) {
+        metadataLogEntry = logEntry;
+      }
+    }
+    String metadataFile = metadataLogEntry.file();
+    TableMetadata metadata = TableMetadataParser.read(io(), metadataFile);
+    return metadata.schema();
+  }
+
+  @Override
+  public Schema schemaForSnapshotAsOfTime(long timestampMillis) {

Review comment:
       I have kept `schemaForSnapshotAsOfTime` for convenience. Since the 
`Table` API is unchanged, I think it is ok.

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -165,13 +165,23 @@
           TableProperties.PARQUET_BATCH_SIZE, 
TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
   }
 
+  private Schema getTableSchema() {
+    if (snapshotId != null) {
+      return table.schemaForSnapshot(snapshotId);
+    } else if (asOfTimestamp != null) {
+      return table.schemaForSnapshotAsOfTime(asOfTimestamp);
+    } else {
+      return table.schema();
+    }
+  }
+
   private Schema lazySchema() {
     if (schema == null) {
       if (requestedSchema != null) {
         // the projection should include all columns that will be returned, 
including those only used in filters
-        this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, 
filterExpression(), caseSensitive);
+        this.schema = SparkSchemaUtil.prune(getTableSchema(), requestedSchema, 
filterExpression(), caseSensitive);
       } else {
-        this.schema = table.schema();
+        this.schema = getTableSchema();

Review comment:
       This is the only point I don't understand. As far as I can tell, 
`BaseTableScan` is constructed with the user's requested schema. If the user is 
wanting to read from an old snapshot, then they should be supplying a requested 
schema compatible with the snapshot's schema. I couldn't see anything I needed 
to change in `BaseTableScan`. If I'm missing something here, can you please 
point out to me the parts I need to change?
   
   I added a new test case where a requested schema is used, and that revealed 
a bug in the spark2 `IcebergSource#createReader`. In that method, there is a 
call to `SparkSchemaUtil.convert(Schema, StructType)` whose only purpose seems 
to be to check that the requested `StructType` does not contain fields in the 
`Schema` (for then the call would cause an exception), and here I needed to use 
the snapshot `Schema` rather than the table `Schema`. Other than that, the 
projection returned by the scan appears to be correct.

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -165,13 +165,23 @@
           TableProperties.PARQUET_BATCH_SIZE, 
TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
   }
 
+  private Schema getTableSchema() {

Review comment:
       Renamed to `snapshotSchema` as you suggest.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -62,7 +62,7 @@ public SparkTable getTable(StructType schema, Transform[] 
partitioning, Map<Stri
     Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);
 
     // Build Spark table based on Iceberg table, and return it
-    return new SparkTable(icebergTable, schema);
+    return new SparkTable(icebergTable, schema, options);

Review comment:
       I extracted `snapshotId` and `asOfTimestamp` from `options` here if they 
appear and pass the values to `SparkTable` in its constructor instead of having 
`SparkTable` do that. I honestly think it's just half a dozen of one and six of 
the other.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to