This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2034b79dfd Flink: Backport add _row_id and
_last_updated_sequence_number readers to 2.1 and 1.20 (#14168)
2034b79dfd is described below
commit 2034b79dfd33519e292d52e711f4cf44c09b8a06
Author: GuoYu <[email protected]>
AuthorDate: Tue Sep 23 23:34:15 2025 +0800
Flink: Backport add _row_id and _last_updated_sequence_number readers to
2.1 and 1.20 (#14168)
---
.../iceberg/flink/data/FlinkParquetReaders.java | 60 +++++++++-------------
.../java/org/apache/iceberg/flink/TestHelpers.java | 56 +++++++++++++++-----
.../iceberg/flink/data/TestFlinkParquetReader.java | 25 ++++++---
.../iceberg/flink/data/FlinkParquetReaders.java | 60 +++++++++-------------
.../java/org/apache/iceberg/flink/TestHelpers.java | 56 +++++++++++++++-----
.../iceberg/flink/data/TestFlinkParquetReader.java | 25 ++++++---
6 files changed, 168 insertions(+), 114 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 5c3581aef3..4e650e9574 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
@@ -90,10 +89,13 @@ public class FlinkParquetReaders {
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public ParquetValueReader<RowData> struct(
Types.StructType expected, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
+
+ if (null == expected) {
+ return new RowDataReader(ImmutableList.of());
+ }
+
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
- Map<Integer, Type> typesById = Maps.newHashMap();
- Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
@@ -102,51 +104,39 @@ public class FlinkParquetReaders {
if (fieldType.getId() != null) {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
- typesById.put(id, fieldType);
- if (idToConstant.containsKey(id)) {
- maxDefinitionLevelsById.put(id, fieldD);
- }
}
}
}
- List<Types.NestedField> expectedFields =
- expected != null ? expected.fields() : ImmutableList.of();
+ int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
+ List<Types.NestedField> expectedFields = expected.fields();
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
- // Defaulting to parent max definition level
- int defaultMaxDefinitionLevel =
type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
- ParquetValueReader<?> reader = readersById.get(id);
- if (idToConstant.containsKey(id)) {
- // containsKey is used because the constant may be null
- int fieldMaxDefinitionLevel =
- maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel);
- reorderedFields.add(
- ParquetValueReaders.constant(idToConstant.get(id),
fieldMaxDefinitionLevel));
- } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
- reorderedFields.add(ParquetValueReaders.position());
- } else if (id == MetadataColumns.IS_DELETED.fieldId()) {
- reorderedFields.add(ParquetValueReaders.constant(false));
- } else if (reader != null) {
- reorderedFields.add(reader);
- } else if (field.initialDefault() != null) {
- reorderedFields.add(
- ParquetValueReaders.constant(
- RowDataUtil.convertConstant(field.type(),
field.initialDefault()),
- maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel)));
- } else if (field.isOptional()) {
- reorderedFields.add(ParquetValueReaders.nulls());
- } else {
- throw new IllegalArgumentException(
- String.format("Missing required field: %s", field.name()));
- }
+ ParquetValueReader<?> reader =
+ ParquetValueReaders.replaceWithMetadataReader(
+ id, readersById.get(id), idToConstant,
constantDefinitionLevel);
+ reorderedFields.add(defaultReader(field, reader,
constantDefinitionLevel));
}
return new RowDataReader(reorderedFields);
}
+ private ParquetValueReader<?> defaultReader(
+ Types.NestedField field, ParquetValueReader<?> reader, int constantDL)
{
+ if (reader != null) {
+ return reader;
+ } else if (field.initialDefault() != null) {
+ return ParquetValueReaders.constant(
+ RowDataUtil.convertConstant(field.type(), field.initialDefault()),
constantDL);
+ } else if (field.isOptional()) {
+ return ParquetValueReaders.nulls();
+ }
+
+ throw new IllegalArgumentException(String.format("Missing required
field: %s", field.name()));
+ }
+
@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?>
elementReader) {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 26aa9d2b4c..a6e835c772 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericDataUtil;
@@ -176,6 +177,16 @@ public class TestHelpers {
LogicalType rowType,
StructLike expectedRecord,
RowData actualRowData) {
+ assertRowData(structType, rowType, expectedRecord, actualRowData, null,
-1);
+ }
+
+ public static void assertRowData(
+ Types.StructType structType,
+ LogicalType rowType,
+ StructLike expectedRecord,
+ RowData actualRowData,
+ Map<Integer, Object> idToConstant,
+ int rowPosition) {
if (expectedRecord == null && actualRowData == null) {
return;
}
@@ -197,21 +208,14 @@ public class TestHelpers {
LogicalType logicalType = ((RowType) rowType).getTypeAt(pos);
Object actualValue =
FlinkRowData.createFieldGetter(logicalType,
pos).getFieldOrNull(actualRowData);
- if (expectedField != null) {
- assertEquals(
- field.type(), logicalType,
expected.getField(expectedField.name()), actualValue);
- } else {
- // convert the initial value to generic because that is the data
model used to generate
- // the expected records
- assertEquals(
- field.type(),
- logicalType,
- GenericDataUtil.internalToGeneric(field.type(),
field.initialDefault()),
- actualValue);
- }
- pos += 1;
- }
+ Object expectedValue =
+ expectedField != null
+ ? getExpectedValue(idToConstant, rowPosition, expectedField,
expected)
+ : GenericDataUtil.internalToGeneric(field.type(),
field.initialDefault());
+ assertEquals(field.type(), logicalType, expectedValue, actualValue);
+ pos++;
+ }
} else {
for (int i = 0; i < types.size(); i += 1) {
LogicalType logicalType = ((RowType) rowType).getTypeAt(i);
@@ -223,6 +227,30 @@ public class TestHelpers {
}
}
+ private static Object getExpectedValue(
+ Map<Integer, Object> idToConstant,
+ int pos,
+ Types.NestedField expectedField,
+ Record expected) {
+ Object expectedValue;
+ int id = expectedField.fieldId();
+ if (id == MetadataColumns.ROW_ID.fieldId()) {
+ expectedValue = expected.getField(expectedField.name());
+ if (expectedValue == null && idToConstant != null) {
+ expectedValue = (Long) idToConstant.get(id) + pos;
+ }
+ } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
+ expectedValue = expected.getField(expectedField.name());
+ if (expectedValue == null && idToConstant != null) {
+ expectedValue = idToConstant.get(id);
+ }
+ } else {
+ expectedValue = expected.getField(expectedField.name());
+ }
+
+ return expectedValue;
+ }
+
private static void assertEquals(
Type type, LogicalType logicalType, Object expected, Object actual) {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index e6781356f7..4e8c9f03f8 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -42,8 +42,10 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.inmemory.InMemoryOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -74,6 +76,11 @@ public class TestFlinkParquetReader extends DataTestBase {
return true;
}
+ @Override
+ protected boolean supportsRowLineage() {
+ return true;
+ }
+
@Test
public void testBuildReader() {
MessageType fileSchema =
@@ -216,11 +223,10 @@ public class TestFlinkParquetReader extends DataTestBase {
private void writeAndValidate(
Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema)
throws IOException {
- File testFile = File.createTempFile("junit", null, temp.toFile());
- assertThat(testFile.delete()).isTrue();
+ OutputFile output = new InMemoryOutputFile();
try (FileAppender<Record> writer =
- Parquet.write(Files.localOutput(testFile))
+ Parquet.write(output)
.schema(writeSchema)
.createWriterFunc(GenericParquetWriter::create)
.build()) {
@@ -228,17 +234,20 @@ public class TestFlinkParquetReader extends DataTestBase {
}
try (CloseableIterable<RowData> reader =
- Parquet.read(Files.localInput(testFile))
+ Parquet.read(output.toInputFile())
.project(expectedSchema)
- .createReaderFunc(type ->
FlinkParquetReaders.buildReader(expectedSchema, type))
+ .createReaderFunc(
+ type -> FlinkParquetReaders.buildReader(expectedSchema, type,
ID_TO_CONSTANT))
.build()) {
- Iterator<Record> expected = iterable.iterator();
Iterator<RowData> rows = reader.iterator();
LogicalType rowType = FlinkSchemaUtil.convert(writeSchema);
- for (int i = 0; i < NUM_RECORDS; i += 1) {
+ int pos = 0;
+ for (Record record : iterable) {
assertThat(rows).hasNext();
- TestHelpers.assertRowData(writeSchema.asStruct(), rowType,
expected.next(), rows.next());
+ TestHelpers.assertRowData(
+ writeSchema.asStruct(), rowType, record, rows.next(),
ID_TO_CONSTANT, pos++);
}
+
assertThat(rows).isExhausted();
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 688481e220..efa1c85ba5 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.variant.Variant;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
@@ -91,10 +90,13 @@ public class FlinkParquetReaders {
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public ParquetValueReader<RowData> struct(
Types.StructType expected, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
+
+ if (null == expected) {
+ return new RowDataReader(ImmutableList.of());
+ }
+
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
- Map<Integer, Type> typesById = Maps.newHashMap();
- Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
@@ -103,51 +105,39 @@ public class FlinkParquetReaders {
if (fieldType.getId() != null) {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
- typesById.put(id, fieldType);
- if (idToConstant.containsKey(id)) {
- maxDefinitionLevelsById.put(id, fieldD);
- }
}
}
}
- List<Types.NestedField> expectedFields =
- expected != null ? expected.fields() : ImmutableList.of();
+ int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
+ List<Types.NestedField> expectedFields = expected.fields();
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
- // Defaulting to parent max definition level
- int defaultMaxDefinitionLevel =
type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
- ParquetValueReader<?> reader = readersById.get(id);
- if (idToConstant.containsKey(id)) {
- // containsKey is used because the constant may be null
- int fieldMaxDefinitionLevel =
- maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel);
- reorderedFields.add(
- ParquetValueReaders.constant(idToConstant.get(id),
fieldMaxDefinitionLevel));
- } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
- reorderedFields.add(ParquetValueReaders.position());
- } else if (id == MetadataColumns.IS_DELETED.fieldId()) {
- reorderedFields.add(ParquetValueReaders.constant(false));
- } else if (reader != null) {
- reorderedFields.add(reader);
- } else if (field.initialDefault() != null) {
- reorderedFields.add(
- ParquetValueReaders.constant(
- RowDataUtil.convertConstant(field.type(),
field.initialDefault()),
- maxDefinitionLevelsById.getOrDefault(id,
defaultMaxDefinitionLevel)));
- } else if (field.isOptional()) {
- reorderedFields.add(ParquetValueReaders.nulls());
- } else {
- throw new IllegalArgumentException(
- String.format("Missing required field: %s", field.name()));
- }
+ ParquetValueReader<?> reader =
+ ParquetValueReaders.replaceWithMetadataReader(
+ id, readersById.get(id), idToConstant,
constantDefinitionLevel);
+ reorderedFields.add(defaultReader(field, reader,
constantDefinitionLevel));
}
return new RowDataReader(reorderedFields);
}
+ private ParquetValueReader<?> defaultReader(
+ Types.NestedField field, ParquetValueReader<?> reader, int constantDL)
{
+ if (reader != null) {
+ return reader;
+ } else if (field.initialDefault() != null) {
+ return ParquetValueReaders.constant(
+ RowDataUtil.convertConstant(field.type(), field.initialDefault()),
constantDL);
+ } else if (field.isOptional()) {
+ return ParquetValueReaders.nulls();
+ }
+
+ throw new IllegalArgumentException(String.format("Missing required
field: %s", field.name()));
+ }
+
@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?>
elementReader) {
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index d8d3c5dc24..5abbbfa9f4 100644
--- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericDataUtil;
@@ -176,6 +177,16 @@ public class TestHelpers {
LogicalType rowType,
StructLike expectedRecord,
RowData actualRowData) {
+ assertRowData(structType, rowType, expectedRecord, actualRowData, null,
-1);
+ }
+
+ public static void assertRowData(
+ Types.StructType structType,
+ LogicalType rowType,
+ StructLike expectedRecord,
+ RowData actualRowData,
+ Map<Integer, Object> idToConstant,
+ int rowPosition) {
if (expectedRecord == null && actualRowData == null) {
return;
}
@@ -197,21 +208,14 @@ public class TestHelpers {
LogicalType logicalType = ((RowType) rowType).getTypeAt(pos);
Object actualValue =
FlinkRowData.createFieldGetter(logicalType,
pos).getFieldOrNull(actualRowData);
- if (expectedField != null) {
- assertEquals(
- field.type(), logicalType,
expected.getField(expectedField.name()), actualValue);
- } else {
- // convert the initial value to generic because that is the data
model used to generate
- // the expected records
- assertEquals(
- field.type(),
- logicalType,
- GenericDataUtil.internalToGeneric(field.type(),
field.initialDefault()),
- actualValue);
- }
- pos += 1;
- }
+ Object expectedValue =
+ expectedField != null
+ ? getExpectedValue(idToConstant, rowPosition, expectedField,
expected)
+ : GenericDataUtil.internalToGeneric(field.type(),
field.initialDefault());
+ assertEquals(field.type(), logicalType, expectedValue, actualValue);
+ pos++;
+ }
} else {
for (int i = 0; i < types.size(); i += 1) {
LogicalType logicalType = ((RowType) rowType).getTypeAt(i);
@@ -223,6 +227,30 @@ public class TestHelpers {
}
}
+ private static Object getExpectedValue(
+ Map<Integer, Object> idToConstant,
+ int pos,
+ Types.NestedField expectedField,
+ Record expected) {
+ Object expectedValue;
+ int id = expectedField.fieldId();
+ if (id == MetadataColumns.ROW_ID.fieldId()) {
+ expectedValue = expected.getField(expectedField.name());
+ if (expectedValue == null && idToConstant != null) {
+ expectedValue = (Long) idToConstant.get(id) + pos;
+ }
+ } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
+ expectedValue = expected.getField(expectedField.name());
+ if (expectedValue == null && idToConstant != null) {
+ expectedValue = idToConstant.get(id);
+ }
+ } else {
+ expectedValue = expected.getField(expectedField.name());
+ }
+
+ return expectedValue;
+ }
+
private static void assertEquals(
Type type, LogicalType logicalType, Object expected, Object actual) {
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index e6781356f7..4e8c9f03f8 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -42,8 +42,10 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.inmemory.InMemoryOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -74,6 +76,11 @@ public class TestFlinkParquetReader extends DataTestBase {
return true;
}
+ @Override
+ protected boolean supportsRowLineage() {
+ return true;
+ }
+
@Test
public void testBuildReader() {
MessageType fileSchema =
@@ -216,11 +223,10 @@ public class TestFlinkParquetReader extends DataTestBase {
private void writeAndValidate(
Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema)
throws IOException {
- File testFile = File.createTempFile("junit", null, temp.toFile());
- assertThat(testFile.delete()).isTrue();
+ OutputFile output = new InMemoryOutputFile();
try (FileAppender<Record> writer =
- Parquet.write(Files.localOutput(testFile))
+ Parquet.write(output)
.schema(writeSchema)
.createWriterFunc(GenericParquetWriter::create)
.build()) {
@@ -228,17 +234,20 @@ public class TestFlinkParquetReader extends DataTestBase {
}
try (CloseableIterable<RowData> reader =
- Parquet.read(Files.localInput(testFile))
+ Parquet.read(output.toInputFile())
.project(expectedSchema)
- .createReaderFunc(type ->
FlinkParquetReaders.buildReader(expectedSchema, type))
+ .createReaderFunc(
+ type -> FlinkParquetReaders.buildReader(expectedSchema, type,
ID_TO_CONSTANT))
.build()) {
- Iterator<Record> expected = iterable.iterator();
Iterator<RowData> rows = reader.iterator();
LogicalType rowType = FlinkSchemaUtil.convert(writeSchema);
- for (int i = 0; i < NUM_RECORDS; i += 1) {
+ int pos = 0;
+ for (Record record : iterable) {
assertThat(rows).hasNext();
- TestHelpers.assertRowData(writeSchema.asStruct(), rowType,
expected.next(), rows.next());
+ TestHelpers.assertRowData(
+ writeSchema.asStruct(), rowType, record, rows.next(),
ID_TO_CONSTANT, pos++);
}
+
assertThat(rows).isExhausted();
}
}