This is an automated email from the ASF dual-hosted git repository. etudenhoefner pushed a commit to branch spark-remote-scan-planning-fix-flakiness in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit a2c169e5c7e8a6c242860ec07abbfb01b2a1de2a Author: Eduard Tudenhoefner <[email protected]> AuthorDate: Fri Dec 19 13:01:40 2025 +0100 Spark: Order results to fix test flakiness with remote scan planning --- .../org/apache/iceberg/spark/sql/TestSelect.java | 87 +++++++++++++--------- 1 file changed, 53 insertions(+), 34 deletions(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index cf4ccd62db..8b146d925b 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -212,14 +212,14 @@ public class TestSelect extends CatalogTestBase { public void testSnapshotInTableName() { // get the snapshot ID of the last write and get the current row set as expected long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); - List<Object[]> expected = sql("SELECT * FROM %s", tableName); + List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName); // create a second snapshot sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); String prefix = "snapshot_id_"; // read the table at the snapshot - List<Object[]> actual = sql("SELECT * FROM %s.%s", tableName, prefix + snapshotId); + List<Object[]> actual = sql("SELECT * FROM %s.%s ORDER by id", tableName, prefix + snapshotId); assertEquals("Snapshot at specific ID, prefix " + prefix, expected, actual); // read the table using DataFrameReader option @@ -228,7 +228,8 @@ public class TestSelect extends CatalogTestBase { .read() .format("iceberg") .option(SparkReadOptions.SNAPSHOT_ID, snapshotId) - .load(tableName); + .load(tableName) + .orderBy("id"); List<Object[]> fromDF = rowsToJava(df.collectAsList()); assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF); } @@ -238,14 +239,14 @@ public class TestSelect extends CatalogTestBase { // get a timestamp just after the last write and get the current row set as expected long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis(); long timestamp = waitUntilAfter(snapshotTs + 2); - List<Object[]> expected = sql("SELECT * FROM %s", tableName); + List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName); // create a second snapshot sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); String prefix = "at_timestamp_"; // read the table at the snapshot - List<Object[]> actual = sql("SELECT * FROM %s.%s", tableName, prefix + timestamp); + List<Object[]> actual = sql("SELECT * FROM %s.%s ORDER by id", tableName, prefix + timestamp); assertEquals("Snapshot at timestamp, prefix " + prefix, expected, actual); // read the table using DataFrameReader option @@ -254,7 +255,8 @@ public class TestSelect extends CatalogTestBase { .read() .format("iceberg") .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) - .load(tableName); + .load(tableName) + .orderBy("id"); List<Object[]> fromDF = rowsToJava(df.collectAsList()); assertEquals("Snapshot at timestamp " + timestamp, expected, fromDF); } @@ -263,19 +265,20 @@ public class TestSelect extends CatalogTestBase { public void testVersionAsOf() { // get the snapshot ID of the last write and get the current row set as expected long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); - List<Object[]> expected = sql("SELECT * FROM %s", tableName); + List<Object[]> expected = sql("SELECT * FROM %s ORDER BY id", tableName); // create a second snapshot sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); // read the table at the snapshot - List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF %s", tableName, snapshotId); + List<Object[]> actual1 = + sql("SELECT * FROM %s VERSION AS OF %s ORDER BY id", tableName, snapshotId); assertEquals("Snapshot at specific ID", expected, actual1); // read the table at the snapshot // HIVE time travel syntax List<Object[]> actual2 = - sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF %s", tableName, snapshotId); + sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF %s ORDER BY id", tableName, snapshotId); assertEquals("Snapshot at specific ID", expected, actual2); // read the table using DataFrameReader option: versionAsOf @@ -284,7 +287,8 @@ public class TestSelect extends CatalogTestBase { .read() .format("iceberg") .option(SparkReadOptions.VERSION_AS_OF, snapshotId) - .load(tableName); + .load(tableName) + .orderBy("id"); List<Object[]> fromDF = rowsToJava(df.collectAsList()); assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF); } @@ -294,28 +298,35 @@ public class TestSelect extends CatalogTestBase { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createTag("test_tag", snapshotId).commit(); - List<Object[]> expected = sql("SELECT * FROM %s", tableName); + List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName); // create a second snapshot, read the table at the tag sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); - List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName); + List<Object[]> actual1 = + sql("SELECT * FROM %s VERSION AS OF 'test_tag' ORDER by id", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual1); // read the table at the tag // HIVE time travel syntax - List<Object[]> actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName); + List<Object[]> actual2 = + sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag' ORDER by id", tableName); assertEquals("Snapshot at specific tag reference name", expected, actual2); // Spark session catalog does not support extended table names if (!"spark_catalog".equals(catalogName)) { // read the table using the "tag_" prefix in the table name - List<Object[]> actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName); + List<Object[]> actual3 = sql("SELECT * FROM %s.tag_test_tag ORDER by id", tableName); assertEquals("Snapshot at specific tag reference name, prefix", expected, actual3); } // read the table using DataFrameReader option: tag Dataset<Row> df = - spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName); + spark + .read() + .format("iceberg") + .option(SparkReadOptions.TAG, "test_tag") + .load(tableName) + .orderBy("id"); List<Object[]> fromDF = rowsToJava(df.collectAsList()); assertEquals("Snapshot at specific tag reference name", expected, fromDF); } @@ -326,7 +337,7 @@ public class TestSelect extends CatalogTestBase { long snapshotId1 = table.currentSnapshot().snapshotId(); // create a second snapshot, read the table at the snapshot - List<Object[]> actual = sql("SELECT * FROM %s", tableName); + List<Object[]> actual = sql("SELECT * FROM %s ORDER by id", tableName); sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); table.refresh(); @@ -337,11 +348,11 @@ public class TestSelect extends CatalogTestBase { // this means if a tag name matches a snapshot ID, it will always choose snapshotID to travel // to. List<Object[]> travelWithStringResult = - sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, snapshotId1); + sql("SELECT * FROM %s VERSION AS OF '%s' ORDER by id", tableName, snapshotId1); assertEquals("Snapshot at specific tag reference name", actual, travelWithStringResult); List<Object[]> travelWithLongResult = - sql("SELECT * FROM %s VERSION AS OF %s", tableName, snapshotId1); + sql("SELECT * FROM %s VERSION AS OF %s ORDER by id", tableName, snapshotId1); assertEquals("Snapshot at specific tag reference name", actual, travelWithLongResult); } @@ -350,23 +361,24 @@ public class TestSelect extends CatalogTestBase { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); table.manageSnapshots().createBranch("test_branch", snapshotId).commit(); - List<Object[]> expected = sql("SELECT * FROM %s", tableName); + List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName); // create a second snapshot, read the table at the branch sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); - List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName); + List<Object[]> actual1 = + sql("SELECT * FROM %s VERSION AS OF 'test_branch' ORDER by id", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual1); // read the table at the branch // HIVE time travel syntax List<Object[]> actual2 = - sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName); + sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch' ORDER by id", tableName); assertEquals("Snapshot at specific branch reference name", expected, actual2); // Spark session catalog does not support extended table names if (!"spark_catalog".equals(catalogName)) { // read the table using the "branch_" prefix in the table name - List<Object[]> actual3 = sql("SELECT * FROM %s.branch_test_branch", tableName); + List<Object[]> actual3 = sql("SELECT * FROM %s.branch_test_branch ORDER by id", tableName); assertEquals("Snapshot at specific branch reference name, prefix", expected, actual3); } @@ -376,7 +388,8 @@ public class TestSelect extends CatalogTestBase { .read() .format("iceberg") .option(SparkReadOptions.BRANCH, "test_branch") - .load(tableName); + .load(tableName) + .orderBy("id"); List<Object[]> fromDF = rowsToJava(df.collectAsList()); assertEquals("Snapshot at specific branch reference name", expected, fromDF); } @@ -400,16 +413,16 @@ public class TestSelect extends CatalogTestBase { // time-travel query using snapshot id should return the snapshot's schema long branchSnapshotId = table.refs().get(branchName).snapshotId(); - assertThat(sql("SELECT * FROM %s VERSION AS OF %s", tableName, branchSnapshotId)) + assertThat(sql("SELECT * FROM %s VERSION AS OF %s ORDER by id", tableName, branchSnapshotId)) .containsExactlyElementsOf(expected); // querying the head of the branch should return the table's schema - assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName)) + assertThat(sql("SELECT * FROM %s VERSION AS OF '%s' ORDER by id", tableName, branchName)) .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null)); if (!"spark_catalog".equals(catalogName)) { // querying the head of the branch using 'branch_' should return the table's schema - assertThat(sql("SELECT * FROM %s.branch_%s", tableName, branchName)) + assertThat(sql("SELECT * FROM %s.branch_%s ORDER by id", tableName, branchName)) .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null)); } @@ -419,7 +432,7 @@ public class TestSelect extends CatalogTestBase { tableName, branchName); // querying the head of the branch returns the table's schema - assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName)) + assertThat(sql("SELECT * FROM %s VERSION AS OF '%s' ORDER by id", tableName, branchName)) .containsExactlyInAnyOrder( row(1L, "a", null), row(2L, "b", null), @@ -429,7 +442,12 @@ public class TestSelect extends CatalogTestBase { // using DataFrameReader with the 'branch' option should return the table's schema Dataset<Row> df = - spark.read().format("iceberg").option(SparkReadOptions.BRANCH, branchName).load(tableName); + spark + .read() + .format("iceberg") + .option(SparkReadOptions.BRANCH, branchName) + .load(tableName) + .orderBy("id"); assertThat(rowsToJava(df.collectAsList())) .containsExactlyInAnyOrder( row(1L, "a", null), @@ -456,30 +474,30 @@ public class TestSelect extends CatalogTestBase { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String formattedDate = sdf.format(new Date(timestamp)); - List<Object[]> expected = sql("SELECT * FROM %s", tableName); + List<Object[]> expected = sql("SELECT * FROM %s ORDER BY id", tableName); // create a second snapshot sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); // read the table at the timestamp in long format i.e 1656507980463. List<Object[]> actualWithLongFormat = - sql("SELECT * FROM %s TIMESTAMP AS OF %s", tableName, timestampInSeconds); + sql("SELECT * FROM %s TIMESTAMP AS OF %s ORDER BY id", tableName, timestampInSeconds); assertEquals("Snapshot at timestamp", expected, actualWithLongFormat); // read the table at the timestamp in date format i.e 2022-06-29 18:40:37 List<Object[]> actualWithDateFormat = - sql("SELECT * FROM %s TIMESTAMP AS OF '%s'", tableName, formattedDate); + sql("SELECT * FROM %s TIMESTAMP AS OF '%s' ORDER BY id", tableName, formattedDate); assertEquals("Snapshot at timestamp", expected, actualWithDateFormat); // HIVE time travel syntax // read the table at the timestamp in long format i.e 1656507980463. List<Object[]> actualWithLongFormatInHiveSyntax = - sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF %s", tableName, timestampInSeconds); + sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF %s ORDER BY id", tableName, timestampInSeconds); assertEquals("Snapshot at specific ID", expected, actualWithLongFormatInHiveSyntax); // read the table at the timestamp in date format i.e 2022-06-29 18:40:37 List<Object[]> actualWithDateFormatInHiveSyntax = - sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s'", tableName, formattedDate); + sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s' ORDER BY id", tableName, formattedDate); assertEquals("Snapshot at specific ID", expected, actualWithDateFormatInHiveSyntax); // read the table using DataFrameReader option @@ -488,7 +506,8 @@ public class TestSelect extends CatalogTestBase { .read() .format("iceberg") .option(SparkReadOptions.TIMESTAMP_AS_OF, formattedDate) - .load(tableName); + .load(tableName) + .orderBy("id"); List<Object[]> fromDF = rowsToJava(df.collectAsList()); assertEquals("Snapshot at timestamp " + timestamp, expected, fromDF); }
