This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e64181d99b Avro: Support row lineage inheritance for planned avro 
reader (#13070)
e64181d99b is described below

commit e64181d99bc58d8482d629e7945ce2510287bf74
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue Jul 1 11:42:16 2025 -0600

    Avro: Support row lineage inheritance for planned avro reader (#13070)
---
 .../java/org/apache/iceberg/avro/ValueReaders.java | 182 +++++++++++++++++----
 .../TestRowLevelOperationsWithLineage.java         |  33 +++-
 .../iceberg/spark/data/TestSparkAvroReader.java    |  43 +++--
 3 files changed, 207 insertions(+), 51 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index bf65e251b3..3ac755bb4a 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -228,50 +228,112 @@ public class ValueReaders {
       Map<Integer, ?> idToConstant,
       BiFunction<Type, Object, Object> convert) {
     Map<Integer, Integer> idToPos = idToPos(expected);
-
     List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
-    List<Schema.Field> fileFields = record.getFields();
-    for (int pos = 0; pos < fileFields.size(); pos += 1) {
+    addFileFieldReadersToPlan(readPlan, record.getFields(), fieldReaders, 
idToPos, idToConstant);
+    addMissingFileReadersToPlan(readPlan, idToPos, expected, idToConstant, 
convert);
+    return readPlan;
+  }
+
+  private static void addFileFieldReadersToPlan(
+      List<Pair<Integer, ValueReader<?>>> readPlan,
+      List<Schema.Field> fileFields,
+      List<ValueReader<?>> fieldReaders,
+      Map<Integer, Integer> idToPos,
+      Map<Integer, ?> idToConstant) {
+    for (int pos = 0; pos < fileFields.size(); pos++) {
       Schema.Field field = fileFields.get(pos);
       ValueReader<?> fieldReader = fieldReaders.get(pos);
       Integer fieldId = AvroSchemaUtil.fieldId(field);
       Integer projectionPos = idToPos.remove(fieldId);
+      readPlan.add(fileFieldReader(fieldId, projectionPos, fieldReader, 
idToConstant));
+    }
+  }
 
-      Object constant = idToConstant.get(fieldId);
-      if (projectionPos != null && constant != null) {
-        readPlan.add(
-            Pair.of(projectionPos, 
ValueReaders.replaceWithConstant(fieldReader, constant)));
-      } else {
-        readPlan.add(Pair.of(projectionPos, fieldReader));
-      }
+  private static Pair<Integer, ValueReader<?>> fileFieldReader(
+      Integer fieldId,
+      Integer projectionPos,
+      ValueReader<?> fieldReader,
+      Map<Integer, ?> idToConstant) {
+    if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
+      Long firstRowId = (Long) idToConstant.get(fieldId);
+      return Pair.of(projectionPos, ValueReaders.rowIds(firstRowId, 
fieldReader));
+    } else if (Objects.equals(fieldId, 
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId())) {
+      Long firstRowId = (Long) 
idToConstant.get(MetadataColumns.ROW_ID.fieldId());
+      Long fileSeqNumber = (Long) idToConstant.get(fieldId);
+      return Pair.of(
+          projectionPos, ValueReaders.lastUpdated(firstRowId, fileSeqNumber, 
fieldReader));
+    } else {
+      return fieldReader(fieldId, projectionPos, fieldReader, idToConstant);
+    }
+  }
+
+  private static Pair<Integer, ValueReader<?>> fieldReader(
+      Integer fieldId,
+      Integer projectionPos,
+      ValueReader<?> fieldReader,
+      Map<Integer, ?> idToConstant) {
+    Object constant = idToConstant.get(fieldId);
+    if (projectionPos != null && constant != null) {
+      return Pair.of(projectionPos, 
ValueReaders.replaceWithConstant(fieldReader, constant));
     }
 
-    // handle any expected columns that are not in the data file
+    return Pair.of(projectionPos, fieldReader);
+  }
+
+  // Handles expected fields not found in data file
+  private static void addMissingFileReadersToPlan(
+      List<Pair<Integer, ValueReader<?>>> readPlan,
+      Map<Integer, Integer> idToPos,
+      Types.StructType expected,
+      Map<Integer, ?> idToConstant,
+      BiFunction<Type, Object, Object> convert) {
     for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
       int fieldId = idAndPos.getKey();
       int pos = idAndPos.getValue();
+      readPlan.add(
+          Pair.of(pos, createMissingFieldReader(fieldId, expected, 
idToConstant, convert)));
+    }
+  }
 
-      Object constant = idToConstant.get(fieldId);
-      Types.NestedField field = expected.field(fieldId);
-      if (constant != null) {
-        readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
-      } else if (field.initialDefault() != null) {
-        readPlan.add(
-            Pair.of(
-                pos, ValueReaders.constant(convert.apply(field.type(), 
field.initialDefault()))));
-      } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
-        readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
-      } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
-        readPlan.add(Pair.of(pos, ValueReaders.positions()));
-      } else if (field.isOptional()) {
-        readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
-      } else {
-        throw new IllegalArgumentException(
-            String.format("Missing required field: %s", field.name()));
-      }
+  // Creates reader for missing field based on default/constant rules
+  private static ValueReader<?> createMissingFieldReader(
+      int fieldId,
+      Types.StructType expected,
+      Map<Integer, ?> idToConstant,
+      BiFunction<Type, Object, Object> convert) {
+    Object constant = idToConstant.get(fieldId);
+    Types.NestedField field = expected.field(fieldId);
+
+    if (constant != null) {
+      return ValueReaders.constant(constant);
+    } else if (field.initialDefault() != null) {
+      return ValueReaders.constant(convert.apply(field.type(), 
field.initialDefault()));
+    } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
+      return ValueReaders.constant(false);
+    } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+      return ValueReaders.positions();
+    } else if (field.isOptional()) {
+      return ValueReaders.constant(null);
     }
 
-    return readPlan;
+    throw new IllegalArgumentException(String.format("Missing required field: 
%s", field.name()));
+  }
+
+  public static ValueReader<Long> rowIds(Long baseRowId, ValueReader<?> 
idReader) {
+    if (baseRowId != null) {
+      return new RowIdReader(baseRowId, (ValueReader<Long>) idReader);
+    } else {
+      return ValueReaders.constant(null);
+    }
+  }
+
+  public static ValueReader<Long> lastUpdated(
+      Long baseRowId, Long fileSeqNumber, ValueReader<?> seqReader) {
+    if (fileSeqNumber != null && baseRowId != null) {
+      return new LastUpdatedSeqReader(fileSeqNumber, (ValueReader<Long>) 
seqReader);
+    } else {
+      return ValueReaders.constant(null);
+    }
   }
 
   private static Map<Integer, Integer> idToPos(Types.StructType struct) {
@@ -1235,4 +1297,64 @@ public class ValueReaders {
       this.currentPosition = posSupplier.get() - 1;
     }
   }
+
+  static class RowIdReader implements ValueReader<Long>, SupportsRowPosition {
+    private final long firstRowId;
+    private final ValueReader<Long> idReader;
+    private final ValueReader<Long> posReader;
+
+    RowIdReader(Long firstRowId, ValueReader<Long> idReader) {
+      this.firstRowId = firstRowId;
+      this.idReader = idReader != null ? idReader : 
ValueReaders.constant(null);
+      this.posReader = positions();
+    }
+
+    @Override
+    public Long read(Decoder ignored, Object reuse) throws IOException {
+      Long idFromFile = idReader.read(ignored, reuse);
+      long pos = posReader.read(ignored, reuse);
+
+      if (idFromFile != null) {
+        return idFromFile;
+      }
+
+      return firstRowId + pos;
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      idReader.skip(decoder);
+      posReader.skip(decoder);
+    }
+
+    @Override
+    public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+      ((SupportsRowPosition) posReader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
+  static class LastUpdatedSeqReader implements ValueReader<Long> {
+    private final long fileSeqNumber;
+    private final ValueReader<Long> seqReader;
+
+    LastUpdatedSeqReader(long fileSeqNumber, ValueReader<Long> seqReader) {
+      this.fileSeqNumber = fileSeqNumber;
+      this.seqReader = seqReader;
+    }
+
+    @Override
+    public Long read(Decoder ignored, Object reuse) throws IOException {
+      Long rowLastUpdatedSeqNumber = seqReader.read(ignored, reuse);
+      if (rowLastUpdatedSeqNumber != null) {
+        return rowLastUpdatedSeqNumber;
+      }
+
+      return fileSeqNumber;
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      seqReader.skip(decoder);
+    }
+  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
index b485b79441..a85f6cfa94 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
@@ -56,6 +56,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.iceberg.spark.functions.BucketFunction;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
@@ -64,7 +65,6 @@ import 
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 
 public abstract class TestRowLevelOperationsWithLineage extends 
SparkRowLevelOperationsTestBase {
@@ -119,6 +119,29 @@ public abstract class TestRowLevelOperationsWithLineage 
extends SparkRowLevelOpe
         DISTRIBUTED,
         3
       },
+      {
+        "spark_catalog",
+        SparkSessionCatalog.class.getName(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            "default-namespace",
+            "default",
+            "clients",
+            "1",
+            "parquet-enabled",
+            "false",
+            "cache-enabled",
+            "false" // Spark will delete tables using v1, leaving the cache 
out of sync
+            ),
+        FileFormat.AVRO,
+        false,
+        WRITE_DISTRIBUTION_MODE_RANGE,
+        false,
+        null,
+        DISTRIBUTED,
+        3
+      },
     };
   }
 
@@ -127,13 +150,6 @@ public abstract class TestRowLevelOperationsWithLineage 
extends SparkRowLevelOpe
     spark.conf().set("spark.sql.shuffle.partitions", "4");
   }
 
-  @BeforeEach
-  public void beforeEach() {
-    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
-    // ToDo: Remove these as row lineage inheritance gets implemented in the 
other readers
-    assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
-  }
-
   @AfterEach
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
@@ -590,6 +606,7 @@ public abstract class TestRowLevelOperationsWithLineage 
extends SparkRowLevelOpe
   }
 
   private Snapshot latestSnapshot(Table table) {
+    table.refresh();
     return branch != null ? table.snapshot(branch) : table.currentSnapshot();
   }
 
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
index 002539a976..5235fcc107 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
@@ -18,18 +18,19 @@
  */
 package org.apache.iceberg.spark.data;
 
-import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroIterable;
-import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.catalyst.InternalRow;
 
@@ -40,35 +41,51 @@ public class TestSparkAvroReader extends AvroDataTest {
   }
 
   @Override
-  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
-    List<Record> expected = RandomData.generateList(writeSchema, 100, 0L);
-
+  protected void writeAndValidate(
+      Schema writeSchema, Schema expectedSchema, 
List<org.apache.iceberg.data.Record> records)
+      throws IOException {
     File testFile = File.createTempFile("junit", null, temp.toFile());
     assertThat(testFile.delete()).as("Delete should succeed").isTrue();
 
-    try (FileAppender<Record> writer =
-        
Avro.write(Files.localOutput(testFile)).schema(writeSchema).named("test").build())
 {
-      for (Record rec : expected) {
-        writer.add(rec);
+    try (DataWriter<Record> dataWriter =
+        Avro.writeData(Files.localOutput(testFile))
+            .schema(writeSchema)
+            .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
+            .withSpec(PartitionSpec.unpartitioned())
+            .build()) {
+      for (org.apache.iceberg.data.Record rec : records) {
+        dataWriter.write(rec);
       }
     }
 
     List<InternalRow> rows;
     try (AvroIterable<InternalRow> reader =
         Avro.read(Files.localInput(testFile))
-            .createResolvingReader(SparkPlannedAvroReader::create)
+            .createResolvingReader(schema -> 
SparkPlannedAvroReader.create(schema, ID_TO_CONSTANT))
             .project(expectedSchema)
             .build()) {
       rows = Lists.newArrayList(reader);
     }
 
-    for (int i = 0; i < expected.size(); i += 1) {
-      assertEqualsUnsafe(expectedSchema.asStruct(), expected.get(i), 
rows.get(i));
+    for (int i = 0; i < records.size(); i += 1) {
+      GenericsHelpers.assertEqualsUnsafe(
+          expectedSchema.asStruct(), records.get(i), rows.get(i), 
ID_TO_CONSTANT, i);
     }
   }
 
+  @Override
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
+    List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L);
+    writeAndValidate(writeSchema, expectedSchema, expected);
+  }
+
   @Override
   protected boolean supportsDefaultValues() {
     return true;
   }
+
+  @Override
+  protected boolean supportsRowLineage() {
+    return true;
+  }
 }

Reply via email to