This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch spark-remote-scan-planning-fix-flakiness
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit a2c169e5c7e8a6c242860ec07abbfb01b2a1de2a
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Dec 19 13:01:40 2025 +0100

    Spark: Order results to fix test flakiness with remote scan planning
---
 .../org/apache/iceberg/spark/sql/TestSelect.java   | 87 +++++++++++++---------
 1 file changed, 53 insertions(+), 34 deletions(-)

diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
index cf4ccd62db..8b146d925b 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
@@ -212,14 +212,14 @@ public class TestSelect extends CatalogTestBase {
   public void testSnapshotInTableName() {
     // get the snapshot ID of the last write and get the current row set as 
expected
     long snapshotId = 
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
-    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+    List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName);
 
     // create a second snapshot
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
 
     String prefix = "snapshot_id_";
     // read the table at the snapshot
-    List<Object[]> actual = sql("SELECT * FROM %s.%s", tableName, prefix + 
snapshotId);
+    List<Object[]> actual = sql("SELECT * FROM %s.%s ORDER by id", tableName, 
prefix + snapshotId);
     assertEquals("Snapshot at specific ID, prefix " + prefix, expected, 
actual);
 
     // read the table using DataFrameReader option
@@ -228,7 +228,8 @@ public class TestSelect extends CatalogTestBase {
             .read()
             .format("iceberg")
             .option(SparkReadOptions.SNAPSHOT_ID, snapshotId)
-            .load(tableName);
+            .load(tableName)
+            .orderBy("id");
     List<Object[]> fromDF = rowsToJava(df.collectAsList());
     assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF);
   }
@@ -238,14 +239,14 @@ public class TestSelect extends CatalogTestBase {
     // get a timestamp just after the last write and get the current row set 
as expected
     long snapshotTs = 
validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis();
     long timestamp = waitUntilAfter(snapshotTs + 2);
-    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+    List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName);
 
     // create a second snapshot
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
 
     String prefix = "at_timestamp_";
     // read the table at the snapshot
-    List<Object[]> actual = sql("SELECT * FROM %s.%s", tableName, prefix + 
timestamp);
+    List<Object[]> actual = sql("SELECT * FROM %s.%s ORDER by id", tableName, 
prefix + timestamp);
     assertEquals("Snapshot at timestamp, prefix " + prefix, expected, actual);
 
     // read the table using DataFrameReader option
@@ -254,7 +255,8 @@ public class TestSelect extends CatalogTestBase {
             .read()
             .format("iceberg")
             .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
-            .load(tableName);
+            .load(tableName)
+            .orderBy("id");
     List<Object[]> fromDF = rowsToJava(df.collectAsList());
     assertEquals("Snapshot at timestamp " + timestamp, expected, fromDF);
   }
@@ -263,19 +265,20 @@ public class TestSelect extends CatalogTestBase {
   public void testVersionAsOf() {
     // get the snapshot ID of the last write and get the current row set as 
expected
     long snapshotId = 
validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
-    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+    List<Object[]> expected = sql("SELECT * FROM %s ORDER BY id", tableName);
 
     // create a second snapshot
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
 
     // read the table at the snapshot
-    List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF %s", 
tableName, snapshotId);
+    List<Object[]> actual1 =
+        sql("SELECT * FROM %s VERSION AS OF %s ORDER BY id", tableName, 
snapshotId);
     assertEquals("Snapshot at specific ID", expected, actual1);
 
     // read the table at the snapshot
     // HIVE time travel syntax
     List<Object[]> actual2 =
-        sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF %s", tableName, 
snapshotId);
+        sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF %s ORDER BY id", 
tableName, snapshotId);
     assertEquals("Snapshot at specific ID", expected, actual2);
 
     // read the table using DataFrameReader option: versionAsOf
@@ -284,7 +287,8 @@ public class TestSelect extends CatalogTestBase {
             .read()
             .format("iceberg")
             .option(SparkReadOptions.VERSION_AS_OF, snapshotId)
-            .load(tableName);
+            .load(tableName)
+            .orderBy("id");
     List<Object[]> fromDF = rowsToJava(df.collectAsList());
     assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF);
   }
@@ -294,28 +298,35 @@ public class TestSelect extends CatalogTestBase {
     Table table = validationCatalog.loadTable(tableIdent);
     long snapshotId = table.currentSnapshot().snapshotId();
     table.manageSnapshots().createTag("test_tag", snapshotId).commit();
-    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+    List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName);
 
     // create a second snapshot, read the table at the tag
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
-    List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", 
tableName);
+    List<Object[]> actual1 =
+        sql("SELECT * FROM %s VERSION AS OF 'test_tag' ORDER by id", 
tableName);
     assertEquals("Snapshot at specific tag reference name", expected, actual1);
 
     // read the table at the tag
     // HIVE time travel syntax
-    List<Object[]> actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 
'test_tag'", tableName);
+    List<Object[]> actual2 =
+        sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag' ORDER by 
id", tableName);
     assertEquals("Snapshot at specific tag reference name", expected, actual2);
 
     // Spark session catalog does not support extended table names
     if (!"spark_catalog".equals(catalogName)) {
       // read the table using the "tag_" prefix in the table name
-      List<Object[]> actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName);
+      List<Object[]> actual3 = sql("SELECT * FROM %s.tag_test_tag ORDER by 
id", tableName);
       assertEquals("Snapshot at specific tag reference name, prefix", 
expected, actual3);
     }
 
     // read the table using DataFrameReader option: tag
     Dataset<Row> df =
-        spark.read().format("iceberg").option(SparkReadOptions.TAG, 
"test_tag").load(tableName);
+        spark
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.TAG, "test_tag")
+            .load(tableName)
+            .orderBy("id");
     List<Object[]> fromDF = rowsToJava(df.collectAsList());
     assertEquals("Snapshot at specific tag reference name", expected, fromDF);
   }
@@ -326,7 +337,7 @@ public class TestSelect extends CatalogTestBase {
     long snapshotId1 = table.currentSnapshot().snapshotId();
 
     // create a second snapshot, read the table at the snapshot
-    List<Object[]> actual = sql("SELECT * FROM %s", tableName);
+    List<Object[]> actual = sql("SELECT * FROM %s ORDER by id", tableName);
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
 
     table.refresh();
@@ -337,11 +348,11 @@ public class TestSelect extends CatalogTestBase {
     // this means if a tag name matches a snapshot ID, it will always choose 
snapshotID to travel
     // to.
     List<Object[]> travelWithStringResult =
-        sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, snapshotId1);
+        sql("SELECT * FROM %s VERSION AS OF '%s' ORDER by id", tableName, 
snapshotId1);
     assertEquals("Snapshot at specific tag reference name", actual, 
travelWithStringResult);
 
     List<Object[]> travelWithLongResult =
-        sql("SELECT * FROM %s VERSION AS OF %s", tableName, snapshotId1);
+        sql("SELECT * FROM %s VERSION AS OF %s ORDER by id", tableName, 
snapshotId1);
     assertEquals("Snapshot at specific tag reference name", actual, 
travelWithLongResult);
   }
 
@@ -350,23 +361,24 @@ public class TestSelect extends CatalogTestBase {
     Table table = validationCatalog.loadTable(tableIdent);
     long snapshotId = table.currentSnapshot().snapshotId();
     table.manageSnapshots().createBranch("test_branch", snapshotId).commit();
-    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+    List<Object[]> expected = sql("SELECT * FROM %s ORDER by id", tableName);
 
     // create a second snapshot, read the table at the branch
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
-    List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 
'test_branch'", tableName);
+    List<Object[]> actual1 =
+        sql("SELECT * FROM %s VERSION AS OF 'test_branch' ORDER by id", 
tableName);
     assertEquals("Snapshot at specific branch reference name", expected, 
actual1);
 
     // read the table at the branch
     // HIVE time travel syntax
     List<Object[]> actual2 =
-        sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", 
tableName);
+        sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch' ORDER by 
id", tableName);
     assertEquals("Snapshot at specific branch reference name", expected, 
actual2);
 
     // Spark session catalog does not support extended table names
     if (!"spark_catalog".equals(catalogName)) {
       // read the table using the "branch_" prefix in the table name
-      List<Object[]> actual3 = sql("SELECT * FROM %s.branch_test_branch", 
tableName);
+      List<Object[]> actual3 = sql("SELECT * FROM %s.branch_test_branch ORDER 
by id", tableName);
       assertEquals("Snapshot at specific branch reference name, prefix", 
expected, actual3);
     }
 
@@ -376,7 +388,8 @@ public class TestSelect extends CatalogTestBase {
             .read()
             .format("iceberg")
             .option(SparkReadOptions.BRANCH, "test_branch")
-            .load(tableName);
+            .load(tableName)
+            .orderBy("id");
     List<Object[]> fromDF = rowsToJava(df.collectAsList());
     assertEquals("Snapshot at specific branch reference name", expected, 
fromDF);
   }
@@ -400,16 +413,16 @@ public class TestSelect extends CatalogTestBase {
 
     // time-travel query using snapshot id should return the snapshot's schema
     long branchSnapshotId = table.refs().get(branchName).snapshotId();
-    assertThat(sql("SELECT * FROM %s VERSION AS OF %s", tableName, 
branchSnapshotId))
+    assertThat(sql("SELECT * FROM %s VERSION AS OF %s ORDER by id", tableName, 
branchSnapshotId))
         .containsExactlyElementsOf(expected);
 
     // querying the head of the branch should return the table's schema
-    assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, 
branchName))
+    assertThat(sql("SELECT * FROM %s VERSION AS OF '%s' ORDER by id", 
tableName, branchName))
         .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", 
null));
 
     if (!"spark_catalog".equals(catalogName)) {
       // querying the head of the branch using 'branch_' should return the 
table's schema
-      assertThat(sql("SELECT * FROM %s.branch_%s", tableName, branchName))
+      assertThat(sql("SELECT * FROM %s.branch_%s ORDER by id", tableName, 
branchName))
           .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, 
"c", null));
     }
 
@@ -419,7 +432,7 @@ public class TestSelect extends CatalogTestBase {
         tableName, branchName);
 
     // querying the head of the branch returns the table's schema
-    assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, 
branchName))
+    assertThat(sql("SELECT * FROM %s VERSION AS OF '%s' ORDER by id", 
tableName, branchName))
         .containsExactlyInAnyOrder(
             row(1L, "a", null),
             row(2L, "b", null),
@@ -429,7 +442,12 @@ public class TestSelect extends CatalogTestBase {
 
     // using DataFrameReader with the 'branch' option should return the 
table's schema
     Dataset<Row> df =
-        spark.read().format("iceberg").option(SparkReadOptions.BRANCH, 
branchName).load(tableName);
+        spark
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.BRANCH, branchName)
+            .load(tableName)
+            .orderBy("id");
     assertThat(rowsToJava(df.collectAsList()))
         .containsExactlyInAnyOrder(
             row(1L, "a", null),
@@ -456,30 +474,30 @@ public class TestSelect extends CatalogTestBase {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     String formattedDate = sdf.format(new Date(timestamp));
 
-    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+    List<Object[]> expected = sql("SELECT * FROM %s ORDER BY id", tableName);
 
     // create a second snapshot
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
 
     // read the table at the timestamp in long format i.e 1656507980463.
     List<Object[]> actualWithLongFormat =
-        sql("SELECT * FROM %s TIMESTAMP AS OF %s", tableName, 
timestampInSeconds);
+        sql("SELECT * FROM %s TIMESTAMP AS OF %s ORDER BY id", tableName, 
timestampInSeconds);
     assertEquals("Snapshot at timestamp", expected, actualWithLongFormat);
 
     // read the table at the timestamp in date format i.e 2022-06-29 18:40:37
     List<Object[]> actualWithDateFormat =
-        sql("SELECT * FROM %s TIMESTAMP AS OF '%s'", tableName, formattedDate);
+        sql("SELECT * FROM %s TIMESTAMP AS OF '%s' ORDER BY id", tableName, 
formattedDate);
     assertEquals("Snapshot at timestamp", expected, actualWithDateFormat);
 
     // HIVE time travel syntax
     // read the table at the timestamp in long format i.e 1656507980463.
     List<Object[]> actualWithLongFormatInHiveSyntax =
-        sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF %s", tableName, 
timestampInSeconds);
+        sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF %s ORDER BY id", 
tableName, timestampInSeconds);
     assertEquals("Snapshot at specific ID", expected, 
actualWithLongFormatInHiveSyntax);
 
     // read the table at the timestamp in date format i.e 2022-06-29 18:40:37
     List<Object[]> actualWithDateFormatInHiveSyntax =
-        sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s'", tableName, 
formattedDate);
+        sql("SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s' ORDER BY id", 
tableName, formattedDate);
     assertEquals("Snapshot at specific ID", expected, 
actualWithDateFormatInHiveSyntax);
 
     // read the table using DataFrameReader option
@@ -488,7 +506,8 @@ public class TestSelect extends CatalogTestBase {
             .read()
             .format("iceberg")
             .option(SparkReadOptions.TIMESTAMP_AS_OF, formattedDate)
-            .load(tableName);
+            .load(tableName)
+            .orderBy("id");
     List<Object[]> fromDF = rowsToJava(df.collectAsList());
     assertEquals("Snapshot at timestamp " + timestamp, expected, fromDF);
   }

Reply via email to