jackieo168 commented on code in PR #6717:
URL: https://github.com/apache/iceberg/pull/6717#discussion_r1096420864
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -105,6 +108,7 @@ public class SparkCatalog extends BaseCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP =
Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID =
Pattern.compile("snapshot_id_(\\d+)");
+ private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
Review Comment:
Do we also want to make similar changes to `SparkCachedTableCatalog`?
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java:
##########
@@ -370,4 +370,42 @@ public void
testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot
id=");
}
+
+ @Test
+ public void testSnapshotSelectionByBranchWithSchemaChange() throws
IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new
SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords,
SimpleRecord.class);
+ firstDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ table.manageSnapshots().createBranch("branch",
table.currentSnapshot().snapshotId()).commit();
+
+ Dataset<Row> currentSnapshotResult =
+ spark.read().format("iceberg").option("branch",
"branch").load(tableLocation);
+ List<SimpleRecord> currentSnapshotRecords =
+
currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
+ Assert.assertEquals(
+ "Current snapshot rows should match", expectedRecords,
currentSnapshotRecords);
+
+ table.updateSchema().deleteColumn("data").commit();
+
+ Dataset<Row> deleteSnapshotResult =
+ spark.read().format("iceberg").option("branch",
"branch").load(tableLocation);
+ List<SimpleRecord> deletedSnapshotRecords =
+
deleteSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> expectedRecordsAfterDeletion = Lists.newArrayList();
+ expectedRecordsAfterDeletion.addAll(firstBatchRecords);
+ Assert.assertEquals(
+ "Current snapshot rows should match", expectedRecords,
deletedSnapshotRecords);
Review Comment:
@namrathamyske please see if you can leverage the unit tests added in our
fork version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]