This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8aa86db08f Spark 3.4: Backport tests from #13070 to 3.4 (#13440)
8aa86db08f is described below

commit 8aa86db08fef9a23036075492b71bbcc2138afc4
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue Jul 1 14:14:34 2025 -0600

    Spark 3.4: Backport tests from #13070 to 3.4 (#13440)
---
 .../TestRowLevelOperationsWithLineage.java         | 33 ++++++++++----
 .../iceberg/spark/data/TestSparkAvroReader.java    | 53 +++++++++++++++-------
 2 files changed, 61 insertions(+), 25 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
index 8882609ccf..ff7ff82ce1 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
@@ -56,6 +56,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.iceberg.spark.functions.BucketFunction;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
@@ -64,7 +65,6 @@ import 
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 
 public abstract class TestRowLevelOperationsWithLineage extends 
SparkRowLevelOperationsTestBase {
@@ -119,6 +119,29 @@ public abstract class TestRowLevelOperationsWithLineage 
extends SparkRowLevelOpe
         DISTRIBUTED,
         3
       },
+      {
+        "spark_catalog",
+        SparkSessionCatalog.class.getName(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            "default-namespace",
+            "default",
+            "clients",
+            "1",
+            "parquet-enabled",
+            "false",
+            "cache-enabled",
+            "false" // Spark will delete tables using v1, leaving the cache 
out of sync
+            ),
+        FileFormat.AVRO,
+        false,
+        WRITE_DISTRIBUTION_MODE_RANGE,
+        false,
+        null,
+        DISTRIBUTED,
+        3
+      },
     };
   }
 
@@ -127,13 +150,6 @@ public abstract class TestRowLevelOperationsWithLineage 
extends SparkRowLevelOpe
     spark.conf().set("spark.sql.shuffle.partitions", "4");
   }
 
-  @BeforeEach
-  public void beforeEach() {
-    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
-    // ToDo: Remove these as row lineage inheritance gets implemented in the 
other readers
-    assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
-  }
-
   @AfterEach
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
@@ -531,6 +547,7 @@ public abstract class TestRowLevelOperationsWithLineage 
extends SparkRowLevelOpe
   }
 
   private Snapshot latestSnapshot(Table table) {
+    table.refresh();
     return branch != null ? table.snapshot(branch) : table.currentSnapshot();
   }
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
index 922af5b26a..5235fcc107 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
@@ -18,17 +18,19 @@
  */
 package org.apache.iceberg.spark.data;
 
-import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
+import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroIterable;
-import org.apache.iceberg.inmemory.InMemoryOutputFile;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.catalyst.InternalRow;
 
@@ -39,34 +41,51 @@ public class TestSparkAvroReader extends AvroDataTest {
   }
 
   @Override
-  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
-    List<Record> expected = RandomData.generateList(writeSchema, 100, 0L);
-
-    OutputFile outputFile = new InMemoryOutputFile();
+  protected void writeAndValidate(
+      Schema writeSchema, Schema expectedSchema, 
List<org.apache.iceberg.data.Record> records)
+      throws IOException {
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).as("Delete should succeed").isTrue();
 
-    try (FileAppender<Record> writer =
-        Avro.write(outputFile).schema(writeSchema).named("test").build()) {
-      for (Record rec : expected) {
-        writer.add(rec);
+    try (DataWriter<Record> dataWriter =
+        Avro.writeData(Files.localOutput(testFile))
+            .schema(writeSchema)
+            .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
+            .withSpec(PartitionSpec.unpartitioned())
+            .build()) {
+      for (org.apache.iceberg.data.Record rec : records) {
+        dataWriter.write(rec);
       }
     }
 
     List<InternalRow> rows;
     try (AvroIterable<InternalRow> reader =
-        Avro.read(outputFile.toInputFile())
-            .createResolvingReader(SparkPlannedAvroReader::create)
+        Avro.read(Files.localInput(testFile))
+            .createResolvingReader(schema -> 
SparkPlannedAvroReader.create(schema, ID_TO_CONSTANT))
             .project(expectedSchema)
             .build()) {
       rows = Lists.newArrayList(reader);
     }
 
-    for (int i = 0; i < expected.size(); i += 1) {
-      assertEqualsUnsafe(expectedSchema.asStruct(), expected.get(i), 
rows.get(i));
+    for (int i = 0; i < records.size(); i += 1) {
+      GenericsHelpers.assertEqualsUnsafe(
+          expectedSchema.asStruct(), records.get(i), rows.get(i), 
ID_TO_CONSTANT, i);
     }
   }
 
+  @Override
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
+    List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L);
+    writeAndValidate(writeSchema, expectedSchema, expected);
+  }
+
   @Override
   protected boolean supportsDefaultValues() {
     return true;
   }
+
+  @Override
+  protected boolean supportsRowLineage() {
+    return true;
+  }
 }

Reply via email to