This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e64181d99b Avro: Support row lineage inheritance for planned avro
reader (#13070)
e64181d99b is described below
commit e64181d99bc58d8482d629e7945ce2510287bf74
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue Jul 1 11:42:16 2025 -0600
Avro: Support row lineage inheritance for planned avro reader (#13070)
---
.../java/org/apache/iceberg/avro/ValueReaders.java | 182 +++++++++++++++++----
.../TestRowLevelOperationsWithLineage.java | 33 +++-
.../iceberg/spark/data/TestSparkAvroReader.java | 43 +++--
3 files changed, 207 insertions(+), 51 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index bf65e251b3..3ac755bb4a 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -228,50 +228,112 @@ public class ValueReaders {
Map<Integer, ?> idToConstant,
BiFunction<Type, Object, Object> convert) {
Map<Integer, Integer> idToPos = idToPos(expected);
-
List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
- List<Schema.Field> fileFields = record.getFields();
- for (int pos = 0; pos < fileFields.size(); pos += 1) {
+ addFileFieldReadersToPlan(readPlan, record.getFields(), fieldReaders,
idToPos, idToConstant);
+ addMissingFileReadersToPlan(readPlan, idToPos, expected, idToConstant,
convert);
+ return readPlan;
+ }
+
+ private static void addFileFieldReadersToPlan(
+ List<Pair<Integer, ValueReader<?>>> readPlan,
+ List<Schema.Field> fileFields,
+ List<ValueReader<?>> fieldReaders,
+ Map<Integer, Integer> idToPos,
+ Map<Integer, ?> idToConstant) {
+ for (int pos = 0; pos < fileFields.size(); pos++) {
Schema.Field field = fileFields.get(pos);
ValueReader<?> fieldReader = fieldReaders.get(pos);
Integer fieldId = AvroSchemaUtil.fieldId(field);
Integer projectionPos = idToPos.remove(fieldId);
+ readPlan.add(fileFieldReader(fieldId, projectionPos, fieldReader,
idToConstant));
+ }
+ }
- Object constant = idToConstant.get(fieldId);
- if (projectionPos != null && constant != null) {
- readPlan.add(
- Pair.of(projectionPos,
ValueReaders.replaceWithConstant(fieldReader, constant)));
- } else {
- readPlan.add(Pair.of(projectionPos, fieldReader));
- }
+ private static Pair<Integer, ValueReader<?>> fileFieldReader(
+ Integer fieldId,
+ Integer projectionPos,
+ ValueReader<?> fieldReader,
+ Map<Integer, ?> idToConstant) {
+ if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
+ Long firstRowId = (Long) idToConstant.get(fieldId);
+ return Pair.of(projectionPos, ValueReaders.rowIds(firstRowId,
fieldReader));
+ } else if (Objects.equals(fieldId,
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId())) {
+ Long firstRowId = (Long)
idToConstant.get(MetadataColumns.ROW_ID.fieldId());
+ Long fileSeqNumber = (Long) idToConstant.get(fieldId);
+ return Pair.of(
+ projectionPos, ValueReaders.lastUpdated(firstRowId, fileSeqNumber,
fieldReader));
+ } else {
+ return fieldReader(fieldId, projectionPos, fieldReader, idToConstant);
+ }
+ }
+
+ private static Pair<Integer, ValueReader<?>> fieldReader(
+ Integer fieldId,
+ Integer projectionPos,
+ ValueReader<?> fieldReader,
+ Map<Integer, ?> idToConstant) {
+ Object constant = idToConstant.get(fieldId);
+ if (projectionPos != null && constant != null) {
+ return Pair.of(projectionPos,
ValueReaders.replaceWithConstant(fieldReader, constant));
}
- // handle any expected columns that are not in the data file
+ return Pair.of(projectionPos, fieldReader);
+ }
+
+ // Handles expected fields not found in data file
+ private static void addMissingFileReadersToPlan(
+ List<Pair<Integer, ValueReader<?>>> readPlan,
+ Map<Integer, Integer> idToPos,
+ Types.StructType expected,
+ Map<Integer, ?> idToConstant,
+ BiFunction<Type, Object, Object> convert) {
for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
int fieldId = idAndPos.getKey();
int pos = idAndPos.getValue();
+ readPlan.add(
+ Pair.of(pos, createMissingFieldReader(fieldId, expected,
idToConstant, convert)));
+ }
+ }
- Object constant = idToConstant.get(fieldId);
- Types.NestedField field = expected.field(fieldId);
- if (constant != null) {
- readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
- } else if (field.initialDefault() != null) {
- readPlan.add(
- Pair.of(
- pos, ValueReaders.constant(convert.apply(field.type(),
field.initialDefault()))));
- } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
- readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
- } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
- readPlan.add(Pair.of(pos, ValueReaders.positions()));
- } else if (field.isOptional()) {
- readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
- } else {
- throw new IllegalArgumentException(
- String.format("Missing required field: %s", field.name()));
- }
+ // Creates reader for missing field based on default/constant rules
+ private static ValueReader<?> createMissingFieldReader(
+ int fieldId,
+ Types.StructType expected,
+ Map<Integer, ?> idToConstant,
+ BiFunction<Type, Object, Object> convert) {
+ Object constant = idToConstant.get(fieldId);
+ Types.NestedField field = expected.field(fieldId);
+
+ if (constant != null) {
+ return ValueReaders.constant(constant);
+ } else if (field.initialDefault() != null) {
+ return ValueReaders.constant(convert.apply(field.type(),
field.initialDefault()));
+ } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
+ return ValueReaders.constant(false);
+ } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+ return ValueReaders.positions();
+ } else if (field.isOptional()) {
+ return ValueReaders.constant(null);
}
- return readPlan;
+ throw new IllegalArgumentException(String.format("Missing required field:
%s", field.name()));
+ }
+
+ public static ValueReader<Long> rowIds(Long baseRowId, ValueReader<?>
idReader) {
+ if (baseRowId != null) {
+ return new RowIdReader(baseRowId, (ValueReader<Long>) idReader);
+ } else {
+ return ValueReaders.constant(null);
+ }
+ }
+
+ public static ValueReader<Long> lastUpdated(
+ Long baseRowId, Long fileSeqNumber, ValueReader<?> seqReader) {
+ if (fileSeqNumber != null && baseRowId != null) {
+ return new LastUpdatedSeqReader(fileSeqNumber, (ValueReader<Long>)
seqReader);
+ } else {
+ return ValueReaders.constant(null);
+ }
}
private static Map<Integer, Integer> idToPos(Types.StructType struct) {
@@ -1235,4 +1297,64 @@ public class ValueReaders {
this.currentPosition = posSupplier.get() - 1;
}
}
+
+ static class RowIdReader implements ValueReader<Long>, SupportsRowPosition {
+ private final long firstRowId;
+ private final ValueReader<Long> idReader;
+ private final ValueReader<Long> posReader;
+
+ RowIdReader(Long firstRowId, ValueReader<Long> idReader) {
+ this.firstRowId = firstRowId;
+ this.idReader = idReader != null ? idReader :
ValueReaders.constant(null);
+ this.posReader = positions();
+ }
+
+ @Override
+ public Long read(Decoder ignored, Object reuse) throws IOException {
+ Long idFromFile = idReader.read(ignored, reuse);
+ long pos = posReader.read(ignored, reuse);
+
+ if (idFromFile != null) {
+ return idFromFile;
+ }
+
+ return firstRowId + pos;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ idReader.skip(decoder);
+ posReader.skip(decoder);
+ }
+
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ ((SupportsRowPosition) posReader).setRowPositionSupplier(posSupplier);
+ }
+ }
+
+ static class LastUpdatedSeqReader implements ValueReader<Long> {
+ private final long fileSeqNumber;
+ private final ValueReader<Long> seqReader;
+
+ LastUpdatedSeqReader(long fileSeqNumber, ValueReader<Long> seqReader) {
+ this.fileSeqNumber = fileSeqNumber;
+ this.seqReader = seqReader;
+ }
+
+ @Override
+ public Long read(Decoder ignored, Object reuse) throws IOException {
+ Long rowLastUpdatedSeqNumber = seqReader.read(ignored, reuse);
+ if (rowLastUpdatedSeqNumber != null) {
+ return rowLastUpdatedSeqNumber;
+ }
+
+ return fileSeqNumber;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ seqReader.skip(decoder);
+ }
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
index b485b79441..a85f6cfa94 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
@@ -56,6 +56,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.functions.BucketFunction;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
@@ -64,7 +65,6 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
public abstract class TestRowLevelOperationsWithLineage extends
SparkRowLevelOperationsTestBase {
@@ -119,6 +119,29 @@ public abstract class TestRowLevelOperationsWithLineage
extends SparkRowLevelOpe
DISTRIBUTED,
3
},
+ {
+ "spark_catalog",
+ SparkSessionCatalog.class.getName(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ "default-namespace",
+ "default",
+ "clients",
+ "1",
+ "parquet-enabled",
+ "false",
+ "cache-enabled",
+ "false" // Spark will delete tables using v1, leaving the cache
out of sync
+ ),
+ FileFormat.AVRO,
+ false,
+ WRITE_DISTRIBUTION_MODE_RANGE,
+ false,
+ null,
+ DISTRIBUTED,
+ 3
+ },
};
}
@@ -127,13 +150,6 @@ public abstract class TestRowLevelOperationsWithLineage
extends SparkRowLevelOpe
spark.conf().set("spark.sql.shuffle.partitions", "4");
}
- @BeforeEach
- public void beforeEach() {
- assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
- // ToDo: Remove these as row lineage inheritance gets implemented in the
other readers
- assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
- }
-
@AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
@@ -590,6 +606,7 @@ public abstract class TestRowLevelOperationsWithLineage
extends SparkRowLevelOpe
}
private Snapshot latestSnapshot(Table table) {
+ table.refresh();
return branch != null ? table.snapshot(branch) : table.currentSnapshot();
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
index 002539a976..5235fcc107 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java
@@ -18,18 +18,19 @@
*/
package org.apache.iceberg.spark.data;
-import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
import java.util.List;
-import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
-import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -40,35 +41,51 @@ public class TestSparkAvroReader extends AvroDataTest {
}
@Override
- protected void writeAndValidate(Schema writeSchema, Schema expectedSchema)
throws IOException {
- List<Record> expected = RandomData.generateList(writeSchema, 100, 0L);
-
+ protected void writeAndValidate(
+ Schema writeSchema, Schema expectedSchema,
List<org.apache.iceberg.data.Record> records)
+ throws IOException {
File testFile = File.createTempFile("junit", null, temp.toFile());
assertThat(testFile.delete()).as("Delete should succeed").isTrue();
- try (FileAppender<Record> writer =
-
Avro.write(Files.localOutput(testFile)).schema(writeSchema).named("test").build())
{
- for (Record rec : expected) {
- writer.add(rec);
+ try (DataWriter<Record> dataWriter =
+ Avro.writeData(Files.localOutput(testFile))
+ .schema(writeSchema)
+ .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
+ .withSpec(PartitionSpec.unpartitioned())
+ .build()) {
+ for (org.apache.iceberg.data.Record rec : records) {
+ dataWriter.write(rec);
}
}
List<InternalRow> rows;
try (AvroIterable<InternalRow> reader =
Avro.read(Files.localInput(testFile))
- .createResolvingReader(SparkPlannedAvroReader::create)
+ .createResolvingReader(schema ->
SparkPlannedAvroReader.create(schema, ID_TO_CONSTANT))
.project(expectedSchema)
.build()) {
rows = Lists.newArrayList(reader);
}
- for (int i = 0; i < expected.size(); i += 1) {
- assertEqualsUnsafe(expectedSchema.asStruct(), expected.get(i),
rows.get(i));
+ for (int i = 0; i < records.size(); i += 1) {
+ GenericsHelpers.assertEqualsUnsafe(
+ expectedSchema.asStruct(), records.get(i), rows.get(i),
ID_TO_CONSTANT, i);
}
}
+ @Override
+ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema)
throws IOException {
+ List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L);
+ writeAndValidate(writeSchema, expectedSchema, expected);
+ }
+
@Override
protected boolean supportsDefaultValues() {
return true;
}
+
+ @Override
+ protected boolean supportsRowLineage() {
+ return true;
+ }
}