This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8aa86db08f Spark 3.4: Backport tests from #13070 to 3.4 (#13440)
8aa86db08f is described below
commit 8aa86db08fef9a23036075492b71bbcc2138afc4
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue Jul 1 14:14:34 2025 -0600
Spark 3.4: Backport tests from #13070 to 3.4 (#13440)
---
.../TestRowLevelOperationsWithLineage.java | 33 ++++++++++----
.../iceberg/spark/data/TestSparkAvroReader.java | 53 +++++++++++++++-------
2 files changed, 61 insertions(+), 25 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
index 8882609ccf..ff7ff82ce1 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
@@ -56,6 +56,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.functions.BucketFunction;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
@@ -64,7 +65,6 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
public abstract class TestRowLevelOperationsWithLineage extends
SparkRowLevelOperationsTestBase {
@@ -119,6 +119,29 @@ public abstract class TestRowLevelOperationsWithLineage
extends SparkRowLevelOpe
DISTRIBUTED,
3
},
+ {
+ "spark_catalog",
+ SparkSessionCatalog.class.getName(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ "default-namespace",
+ "default",
+ "clients",
+ "1",
+ "parquet-enabled",
+ "false",
+ "cache-enabled",
+ "false" // Spark will delete tables using v1, leaving the cache
out of sync
+ ),
+ FileFormat.AVRO,
+ false,
+ WRITE_DISTRIBUTION_MODE_RANGE,
+ false,
+ null,
+ DISTRIBUTED,
+ 3
+ },
};
}
@@ -127,13 +150,6 @@ public abstract class TestRowLevelOperationsWithLineage
extends SparkRowLevelOpe
spark.conf().set("spark.sql.shuffle.partitions", "4");
}
- @BeforeEach
- public void beforeEach() {
- assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
- // ToDo: Remove these as row lineage inheritance gets implemented in the
other readers
- assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
- }
-
@AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
@@ -531,6 +547,7 @@ public abstract class TestRowLevelOperationsWithLineage
extends SparkRowLevelOpe
}
private Snapshot latestSnapshot(Table table) {
+ table.refresh();
return branch != null ? table.snapshot(branch) : table.currentSnapshot();
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
index 922af5b26a..5235fcc107 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
@@ -18,17 +18,19 @@
*/
package org.apache.iceberg.spark.data;
-import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
+import static org.assertj.core.api.Assertions.assertThat;
+import java.io.File;
import java.io.IOException;
import java.util.List;
-import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
-import org.apache.iceberg.inmemory.InMemoryOutputFile;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -39,34 +41,51 @@ public class TestSparkAvroReader extends AvroDataTest {
}
@Override
- protected void writeAndValidate(Schema writeSchema, Schema expectedSchema)
throws IOException {
- List<Record> expected = RandomData.generateList(writeSchema, 100, 0L);
-
- OutputFile outputFile = new InMemoryOutputFile();
+ protected void writeAndValidate(
+ Schema writeSchema, Schema expectedSchema,
List<org.apache.iceberg.data.Record> records)
+ throws IOException {
+ File testFile = File.createTempFile("junit", null, temp.toFile());
+ assertThat(testFile.delete()).as("Delete should succeed").isTrue();
- try (FileAppender<Record> writer =
- Avro.write(outputFile).schema(writeSchema).named("test").build()) {
- for (Record rec : expected) {
- writer.add(rec);
+ try (DataWriter<Record> dataWriter =
+ Avro.writeData(Files.localOutput(testFile))
+ .schema(writeSchema)
+ .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
+ .withSpec(PartitionSpec.unpartitioned())
+ .build()) {
+ for (org.apache.iceberg.data.Record rec : records) {
+ dataWriter.write(rec);
}
}
List<InternalRow> rows;
try (AvroIterable<InternalRow> reader =
- Avro.read(outputFile.toInputFile())
- .createResolvingReader(SparkPlannedAvroReader::create)
+ Avro.read(Files.localInput(testFile))
+ .createResolvingReader(schema ->
SparkPlannedAvroReader.create(schema, ID_TO_CONSTANT))
.project(expectedSchema)
.build()) {
rows = Lists.newArrayList(reader);
}
- for (int i = 0; i < expected.size(); i += 1) {
- assertEqualsUnsafe(expectedSchema.asStruct(), expected.get(i),
rows.get(i));
+ for (int i = 0; i < records.size(); i += 1) {
+ GenericsHelpers.assertEqualsUnsafe(
+ expectedSchema.asStruct(), records.get(i), rows.get(i),
ID_TO_CONSTANT, i);
}
}
+ @Override
+ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema)
throws IOException {
+ List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L);
+ writeAndValidate(writeSchema, expectedSchema, expected);
+ }
+
@Override
protected boolean supportsDefaultValues() {
return true;
}
+
+ @Override
+ protected boolean supportsRowLineage() {
+ return true;
+ }
}