rdblue commented on a change in pull request #1508:
URL: https://github.com/apache/iceberg/pull/1508#discussion_r711798009
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,24 +103,35 @@ public Table getTable(StructType schema, Transform[]
partitioning, Map<String, S
SparkSession spark = SparkSession.active();
setupDefaultSparkCatalog(spark);
String path = options.get("path");
+ Long snapshotId = Spark3Util.propertyAsLong(options,
SparkReadOptions.SNAPSHOT_ID, null);
+ Long asOfTimestamp = Spark3Util.propertyAsLong(options,
SparkReadOptions.AS_OF_TIMESTAMP, null);
CatalogManager catalogManager = spark.sessionState().catalogManager();
if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
return new
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
- new PathIdentifier(path));
+ new PathIdentifier(path, snapshotId, asOfTimestamp));
}
+ // Get the CatalogAndIdentifier and swap out the Identifier for a
snapshot-aware TableIdentifier
+ // if snapshotId or asOfTimestamp is set.
+
final Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
Spark3Util.catalogAndIdentifier(
"path or identifier", spark, path);
if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
!(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
// catalog is a session catalog but does not support Iceberg. Use
Iceberg instead.
+ Identifier ident = catalogAndIdentifier.identifier();
return new
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
- catalogAndIdentifier.identifier());
- } else {
+ TableIdentifier.of(ident.namespace(), ident.name(), snapshotId,
asOfTimestamp));
Review comment:
Okay, thanks.
##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -166,13 +167,17 @@
this.readTimestampWithoutZone =
SparkUtil.canHandleTimestampWithoutZone(options.asMap(), sessionConf);
}
+ protected Schema snapshotSchema() {
+ return SnapshotUtil.schemaFor(table, snapshotId, asOfTimestamp);
Review comment:
https://github.com/wypoon/iceberg/pull/1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]