97harsh commented on code in PR #14964:
URL: https://github.com/apache/iceberg/pull/14964#discussion_r2667474433
##########
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -1105,4 +1105,185 @@ private List<Object[]> currentData() {
private List<Object[]> currentData(String table) {
return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2,
c3").collectAsList());
}
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranch() {
+ createTable();
+ insertData(10);
+
+ String branchName = "testBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ // Insert more data to the branch (multiple inserts to create multiple
small files)
+ for (int i = 0; i < 5; i++) {
+ sql("INSERT INTO %s.branch_%s VALUES (1, 'a', 'b'), (2, 'c', 'd')",
tableName, branchName);
+ }
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch with options to force rewrite
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s',
options => map('min-input-files','2'))",
+ catalogName, tableName, branchName);
+
+ // Verify output
+ assertThat(output).hasSize(1);
+ assertThat(output.get(0)).hasSize(5);
+
+ // Check if files were actually rewritten
+ int filesRewritten = (Integer) output.get(0)[0];
+ int filesAdded = (Integer) output.get(0)[1];
+
+ // Verify files were rewritten (we created multiple small files, so they
should be compacted)
+ assertThat(filesRewritten)
+ .as("Files should be rewritten when multiple small files exist")
+ .isGreaterThan(0);
+
+ // Verify branch snapshot changed
+ table.refresh();
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testRewriteDataFilesOnBranchWithFilter() {
+ createPartitionTable();
+ insertData(10);
+
+ String branchName = "filteredBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ // Insert more data to the branch (insert multiple times to create
multiple files)
+ for (int i = 0; i < 5; i++) {
+ sql(
+ "INSERT INTO %s.branch_%s VALUES (10, 'a', 'b'), (20, 'c', 'd'),
(30, 'e', 'f')",
+ tableName, branchName);
+ }
+
+ // Get snapshot IDs before rewrite
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotId = table.currentSnapshot().snapshotId();
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+
+ // Call rewrite_data_files on the branch with filter
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s',
where => 'c1 >= 10')",
+ catalogName, tableName, branchName);
+
+ // Verify output
+ assertThat(output).hasSize(1);
+ assertThat(output.get(0)).hasSize(5);
+
+ // Check if files were actually rewritten
+ int filesRewritten = (Integer) output.get(0)[0];
+ int filesAdded = (Integer) output.get(0)[1];
+
+ // Verify branch snapshot changed only if files were rewritten
+ table.refresh();
+ if (filesRewritten > 0 || filesAdded > 0) {
+ assertThat(table.refs().get(branchName).snapshotId())
+ .as("Branch snapshot should be updated when files are rewritten")
+ .isNotEqualTo(branchSnapshotId);
+ }
+
+ // Verify main snapshot unchanged
+ assertThat(table.currentSnapshot().snapshotId())
+ .as("Main snapshot should remain unchanged")
+ .isEqualTo(mainSnapshotId);
+ }
+
+ @TestTemplate
+ public void testBranchCompactionDoesNotAffectMain() {
+ createTable();
+
+ // Insert 10 rows on main branch (creates 1 file)
+ insertData(10);
+
+ // Capture main branch state BEFORE creating branch
+ Table table = validationCatalog.loadTable(tableIdent);
+ long mainSnapshotIdBeforeBranch = table.currentSnapshot().snapshotId();
+
+ // Create branch from current main state
+ String branchName = "compactionBranch";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
+
+ // CRITICAL: Add more data to MAIN to make it diverge from branch
+ // This ensures main's currentSnapshot != branch's snapshot
+ for (int i = 0; i < 5; i++) {
+ sql("INSERT INTO %s VALUES (%d, 'main-diverge', 'data')", tableName, i +
1000);
+ }
+
+ // Refresh to get new main snapshot after divergence
+ table.refresh();
+ long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
+
+ // Now insert multiple small batches to the BRANCH ONLY (creates many
small files)
+ for (int i = 0; i < 10; i++) {
+ sql("INSERT INTO %s.branch_%s VALUES (%d, 'branch', 'data')", tableName,
branchName, i + 100);
+ }
+
+ // Refresh table and get branch snapshot before compaction
+ table.refresh();
+ long branchSnapshotBeforeCompaction =
table.refs().get(branchName).snapshotId();
+
+ // Verify that branch and main have diverged
+ assertThat(branchSnapshotBeforeCompaction)
+ .as("Branch and main should have different snapshots")
+ .isNotEqualTo(mainSnapshotAfterDivergence);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s',
options => map('min-input-files','2'))",
+ catalogName, tableName, branchName);
+
+ // THIS ASSERTION SHOULD FAIL WITH CURRENT BUGGY CODE:
+ // Since the planner uses main's snapshot (which has no small files to
compact),
+ // it will find 0 files to rewrite, even though the branch has 10 small
files!
+ int filesRewritten = (Integer) output.get(0)[0];
+ int filesAdded = (Integer) output.get(0)[1];
+
+ assertThat(filesRewritten)
+ .as("Branch compaction should rewrite the 10 small files on the
branch, not main's files")
+ .isGreaterThan(0);
Review Comment:
Yes, added the numbers in there. Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]