This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2034b79dfd Flink: Backport add _row_id and 
_last_updated_sequence_number readers to 2.1 and 1.20 (#14168)
2034b79dfd is described below

commit 2034b79dfd33519e292d52e711f4cf44c09b8a06
Author: GuoYu <[email protected]>
AuthorDate: Tue Sep 23 23:34:15 2025 +0800

    Flink: Backport add _row_id and _last_updated_sequence_number readers to 
2.1 and 1.20 (#14168)
---
 .../iceberg/flink/data/FlinkParquetReaders.java    | 60 +++++++++-------------
 .../java/org/apache/iceberg/flink/TestHelpers.java | 56 +++++++++++++++-----
 .../iceberg/flink/data/TestFlinkParquetReader.java | 25 ++++++---
 .../iceberg/flink/data/FlinkParquetReaders.java    | 60 +++++++++-------------
 .../java/org/apache/iceberg/flink/TestHelpers.java | 56 +++++++++++++++-----
 .../iceberg/flink/data/TestFlinkParquetReader.java | 25 ++++++---
 6 files changed, 168 insertions(+), 114 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 5c3581aef3..4e650e9574 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.data.RawValueData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
@@ -90,10 +89,13 @@ public class FlinkParquetReaders {
     @SuppressWarnings("checkstyle:CyclomaticComplexity")
     public ParquetValueReader<RowData> struct(
         Types.StructType expected, GroupType struct, 
List<ParquetValueReader<?>> fieldReaders) {
+
+      if (null == expected) {
+        return new RowDataReader(ImmutableList.of());
+      }
+
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
-      Map<Integer, Type> typesById = Maps.newHashMap();
-      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
@@ -102,51 +104,39 @@ public class FlinkParquetReaders {
           if (fieldType.getId() != null) {
             int id = fieldType.getId().intValue();
             readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
-            typesById.put(id, fieldType);
-            if (idToConstant.containsKey(id)) {
-              maxDefinitionLevelsById.put(id, fieldD);
-            }
           }
         }
       }
 
-      List<Types.NestedField> expectedFields =
-          expected != null ? expected.fields() : ImmutableList.of();
+      int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
+      List<Types.NestedField> expectedFields = expected.fields();
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
-      // Defaulting to parent max definition level
-      int defaultMaxDefinitionLevel = 
type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
-        ParquetValueReader<?> reader = readersById.get(id);
-        if (idToConstant.containsKey(id)) {
-          // containsKey is used because the constant may be null
-          int fieldMaxDefinitionLevel =
-              maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel);
-          reorderedFields.add(
-              ParquetValueReaders.constant(idToConstant.get(id), 
fieldMaxDefinitionLevel));
-        } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
-          reorderedFields.add(ParquetValueReaders.position());
-        } else if (id == MetadataColumns.IS_DELETED.fieldId()) {
-          reorderedFields.add(ParquetValueReaders.constant(false));
-        } else if (reader != null) {
-          reorderedFields.add(reader);
-        } else if (field.initialDefault() != null) {
-          reorderedFields.add(
-              ParquetValueReaders.constant(
-                  RowDataUtil.convertConstant(field.type(), 
field.initialDefault()),
-                  maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel)));
-        } else if (field.isOptional()) {
-          reorderedFields.add(ParquetValueReaders.nulls());
-        } else {
-          throw new IllegalArgumentException(
-              String.format("Missing required field: %s", field.name()));
-        }
+        ParquetValueReader<?> reader =
+            ParquetValueReaders.replaceWithMetadataReader(
+                id, readersById.get(id), idToConstant, 
constantDefinitionLevel);
+        reorderedFields.add(defaultReader(field, reader, 
constantDefinitionLevel));
       }
 
       return new RowDataReader(reorderedFields);
     }
 
+    private ParquetValueReader<?> defaultReader(
+        Types.NestedField field, ParquetValueReader<?> reader, int constantDL) 
{
+      if (reader != null) {
+        return reader;
+      } else if (field.initialDefault() != null) {
+        return ParquetValueReaders.constant(
+            RowDataUtil.convertConstant(field.type(), field.initialDefault()), 
constantDL);
+      } else if (field.isOptional()) {
+        return ParquetValueReaders.nulls();
+      }
+
+      throw new IllegalArgumentException(String.format("Missing required 
field: %s", field.name()));
+    }
+
     @Override
     public ParquetValueReader<?> list(
         Types.ListType expectedList, GroupType array, ParquetValueReader<?> 
elementReader) {
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 26aa9d2b4c..a6e835c772 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.GenericDataUtil;
@@ -176,6 +177,16 @@ public class TestHelpers {
       LogicalType rowType,
       StructLike expectedRecord,
       RowData actualRowData) {
+    assertRowData(structType, rowType, expectedRecord, actualRowData, null, 
-1);
+  }
+
+  public static void assertRowData(
+      Types.StructType structType,
+      LogicalType rowType,
+      StructLike expectedRecord,
+      RowData actualRowData,
+      Map<Integer, Object> idToConstant,
+      int rowPosition) {
     if (expectedRecord == null && actualRowData == null) {
       return;
     }
@@ -197,21 +208,14 @@ public class TestHelpers {
         LogicalType logicalType = ((RowType) rowType).getTypeAt(pos);
         Object actualValue =
             FlinkRowData.createFieldGetter(logicalType, 
pos).getFieldOrNull(actualRowData);
-        if (expectedField != null) {
-          assertEquals(
-              field.type(), logicalType, 
expected.getField(expectedField.name()), actualValue);
-        } else {
-          // convert the initial value to generic because that is the data 
model used to generate
-          // the expected records
-          assertEquals(
-              field.type(),
-              logicalType,
-              GenericDataUtil.internalToGeneric(field.type(), 
field.initialDefault()),
-              actualValue);
-        }
-        pos += 1;
-      }
+        Object expectedValue =
+            expectedField != null
+                ? getExpectedValue(idToConstant, rowPosition, expectedField, 
expected)
+                : GenericDataUtil.internalToGeneric(field.type(), 
field.initialDefault());
 
+        assertEquals(field.type(), logicalType, expectedValue, actualValue);
+        pos++;
+      }
     } else {
       for (int i = 0; i < types.size(); i += 1) {
         LogicalType logicalType = ((RowType) rowType).getTypeAt(i);
@@ -223,6 +227,30 @@ public class TestHelpers {
     }
   }
 
+  private static Object getExpectedValue(
+      Map<Integer, Object> idToConstant,
+      int pos,
+      Types.NestedField expectedField,
+      Record expected) {
+    Object expectedValue;
+    int id = expectedField.fieldId();
+    if (id == MetadataColumns.ROW_ID.fieldId()) {
+      expectedValue = expected.getField(expectedField.name());
+      if (expectedValue == null && idToConstant != null) {
+        expectedValue = (Long) idToConstant.get(id) + pos;
+      }
+    } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
+      expectedValue = expected.getField(expectedField.name());
+      if (expectedValue == null && idToConstant != null) {
+        expectedValue = idToConstant.get(id);
+      }
+    } else {
+      expectedValue = expected.getField(expectedField.name());
+    }
+
+    return expectedValue;
+  }
+
   private static void assertEquals(
       Type type, LogicalType logicalType, Object expected, Object actual) {
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index e6781356f7..4e8c9f03f8 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -42,8 +42,10 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.inmemory.InMemoryOutputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -74,6 +76,11 @@ public class TestFlinkParquetReader extends DataTestBase {
     return true;
   }
 
+  @Override
+  protected boolean supportsRowLineage() {
+    return true;
+  }
+
   @Test
   public void testBuildReader() {
     MessageType fileSchema =
@@ -216,11 +223,10 @@ public class TestFlinkParquetReader extends DataTestBase {
 
   private void writeAndValidate(
       Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema) 
throws IOException {
-    File testFile = File.createTempFile("junit", null, temp.toFile());
-    assertThat(testFile.delete()).isTrue();
 
+    OutputFile output = new InMemoryOutputFile();
     try (FileAppender<Record> writer =
-        Parquet.write(Files.localOutput(testFile))
+        Parquet.write(output)
             .schema(writeSchema)
             .createWriterFunc(GenericParquetWriter::create)
             .build()) {
@@ -228,17 +234,20 @@ public class TestFlinkParquetReader extends DataTestBase {
     }
 
     try (CloseableIterable<RowData> reader =
-        Parquet.read(Files.localInput(testFile))
+        Parquet.read(output.toInputFile())
             .project(expectedSchema)
-            .createReaderFunc(type -> 
FlinkParquetReaders.buildReader(expectedSchema, type))
+            .createReaderFunc(
+                type -> FlinkParquetReaders.buildReader(expectedSchema, type, 
ID_TO_CONSTANT))
             .build()) {
-      Iterator<Record> expected = iterable.iterator();
       Iterator<RowData> rows = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(writeSchema);
-      for (int i = 0; i < NUM_RECORDS; i += 1) {
+      int pos = 0;
+      for (Record record : iterable) {
         assertThat(rows).hasNext();
-        TestHelpers.assertRowData(writeSchema.asStruct(), rowType, 
expected.next(), rows.next());
+        TestHelpers.assertRowData(
+            writeSchema.asStruct(), rowType, record, rows.next(), 
ID_TO_CONSTANT, pos++);
       }
+
       assertThat(rows).isExhausted();
     }
   }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 688481e220..efa1c85ba5 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.types.variant.Variant;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
@@ -91,10 +90,13 @@ public class FlinkParquetReaders {
     @SuppressWarnings("checkstyle:CyclomaticComplexity")
     public ParquetValueReader<RowData> struct(
         Types.StructType expected, GroupType struct, 
List<ParquetValueReader<?>> fieldReaders) {
+
+      if (null == expected) {
+        return new RowDataReader(ImmutableList.of());
+      }
+
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
-      Map<Integer, Type> typesById = Maps.newHashMap();
-      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
@@ -103,51 +105,39 @@ public class FlinkParquetReaders {
           if (fieldType.getId() != null) {
             int id = fieldType.getId().intValue();
             readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
-            typesById.put(id, fieldType);
-            if (idToConstant.containsKey(id)) {
-              maxDefinitionLevelsById.put(id, fieldD);
-            }
           }
         }
       }
 
-      List<Types.NestedField> expectedFields =
-          expected != null ? expected.fields() : ImmutableList.of();
+      int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
+      List<Types.NestedField> expectedFields = expected.fields();
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
-      // Defaulting to parent max definition level
-      int defaultMaxDefinitionLevel = 
type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
-        ParquetValueReader<?> reader = readersById.get(id);
-        if (idToConstant.containsKey(id)) {
-          // containsKey is used because the constant may be null
-          int fieldMaxDefinitionLevel =
-              maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel);
-          reorderedFields.add(
-              ParquetValueReaders.constant(idToConstant.get(id), 
fieldMaxDefinitionLevel));
-        } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
-          reorderedFields.add(ParquetValueReaders.position());
-        } else if (id == MetadataColumns.IS_DELETED.fieldId()) {
-          reorderedFields.add(ParquetValueReaders.constant(false));
-        } else if (reader != null) {
-          reorderedFields.add(reader);
-        } else if (field.initialDefault() != null) {
-          reorderedFields.add(
-              ParquetValueReaders.constant(
-                  RowDataUtil.convertConstant(field.type(), 
field.initialDefault()),
-                  maxDefinitionLevelsById.getOrDefault(id, 
defaultMaxDefinitionLevel)));
-        } else if (field.isOptional()) {
-          reorderedFields.add(ParquetValueReaders.nulls());
-        } else {
-          throw new IllegalArgumentException(
-              String.format("Missing required field: %s", field.name()));
-        }
+        ParquetValueReader<?> reader =
+            ParquetValueReaders.replaceWithMetadataReader(
+                id, readersById.get(id), idToConstant, 
constantDefinitionLevel);
+        reorderedFields.add(defaultReader(field, reader, 
constantDefinitionLevel));
       }
 
       return new RowDataReader(reorderedFields);
     }
 
+    private ParquetValueReader<?> defaultReader(
+        Types.NestedField field, ParquetValueReader<?> reader, int constantDL) 
{
+      if (reader != null) {
+        return reader;
+      } else if (field.initialDefault() != null) {
+        return ParquetValueReaders.constant(
+            RowDataUtil.convertConstant(field.type(), field.initialDefault()), 
constantDL);
+      } else if (field.isOptional()) {
+        return ParquetValueReaders.nulls();
+      }
+
+      throw new IllegalArgumentException(String.format("Missing required 
field: %s", field.name()));
+    }
+
     @Override
     public ParquetValueReader<?> list(
         Types.ListType expectedList, GroupType array, ParquetValueReader<?> 
elementReader) {
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index d8d3c5dc24..5abbbfa9f4 100644
--- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.GenericDataUtil;
@@ -176,6 +177,16 @@ public class TestHelpers {
       LogicalType rowType,
       StructLike expectedRecord,
       RowData actualRowData) {
+    assertRowData(structType, rowType, expectedRecord, actualRowData, null, 
-1);
+  }
+
+  public static void assertRowData(
+      Types.StructType structType,
+      LogicalType rowType,
+      StructLike expectedRecord,
+      RowData actualRowData,
+      Map<Integer, Object> idToConstant,
+      int rowPosition) {
     if (expectedRecord == null && actualRowData == null) {
       return;
     }
@@ -197,21 +208,14 @@ public class TestHelpers {
         LogicalType logicalType = ((RowType) rowType).getTypeAt(pos);
         Object actualValue =
             FlinkRowData.createFieldGetter(logicalType, 
pos).getFieldOrNull(actualRowData);
-        if (expectedField != null) {
-          assertEquals(
-              field.type(), logicalType, 
expected.getField(expectedField.name()), actualValue);
-        } else {
-          // convert the initial value to generic because that is the data 
model used to generate
-          // the expected records
-          assertEquals(
-              field.type(),
-              logicalType,
-              GenericDataUtil.internalToGeneric(field.type(), 
field.initialDefault()),
-              actualValue);
-        }
-        pos += 1;
-      }
+        Object expectedValue =
+            expectedField != null
+                ? getExpectedValue(idToConstant, rowPosition, expectedField, 
expected)
+                : GenericDataUtil.internalToGeneric(field.type(), 
field.initialDefault());
 
+        assertEquals(field.type(), logicalType, expectedValue, actualValue);
+        pos++;
+      }
     } else {
       for (int i = 0; i < types.size(); i += 1) {
         LogicalType logicalType = ((RowType) rowType).getTypeAt(i);
@@ -223,6 +227,30 @@ public class TestHelpers {
     }
   }
 
+  private static Object getExpectedValue(
+      Map<Integer, Object> idToConstant,
+      int pos,
+      Types.NestedField expectedField,
+      Record expected) {
+    Object expectedValue;
+    int id = expectedField.fieldId();
+    if (id == MetadataColumns.ROW_ID.fieldId()) {
+      expectedValue = expected.getField(expectedField.name());
+      if (expectedValue == null && idToConstant != null) {
+        expectedValue = (Long) idToConstant.get(id) + pos;
+      }
+    } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
+      expectedValue = expected.getField(expectedField.name());
+      if (expectedValue == null && idToConstant != null) {
+        expectedValue = idToConstant.get(id);
+      }
+    } else {
+      expectedValue = expected.getField(expectedField.name());
+    }
+
+    return expectedValue;
+  }
+
   private static void assertEquals(
       Type type, LogicalType logicalType, Object expected, Object actual) {
 
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index e6781356f7..4e8c9f03f8 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -42,8 +42,10 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.inmemory.InMemoryOutputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -74,6 +76,11 @@ public class TestFlinkParquetReader extends DataTestBase {
     return true;
   }
 
+  @Override
+  protected boolean supportsRowLineage() {
+    return true;
+  }
+
   @Test
   public void testBuildReader() {
     MessageType fileSchema =
@@ -216,11 +223,10 @@ public class TestFlinkParquetReader extends DataTestBase {
 
   private void writeAndValidate(
       Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema) 
throws IOException {
-    File testFile = File.createTempFile("junit", null, temp.toFile());
-    assertThat(testFile.delete()).isTrue();
 
+    OutputFile output = new InMemoryOutputFile();
     try (FileAppender<Record> writer =
-        Parquet.write(Files.localOutput(testFile))
+        Parquet.write(output)
             .schema(writeSchema)
             .createWriterFunc(GenericParquetWriter::create)
             .build()) {
@@ -228,17 +234,20 @@ public class TestFlinkParquetReader extends DataTestBase {
     }
 
     try (CloseableIterable<RowData> reader =
-        Parquet.read(Files.localInput(testFile))
+        Parquet.read(output.toInputFile())
             .project(expectedSchema)
-            .createReaderFunc(type -> 
FlinkParquetReaders.buildReader(expectedSchema, type))
+            .createReaderFunc(
+                type -> FlinkParquetReaders.buildReader(expectedSchema, type, 
ID_TO_CONSTANT))
             .build()) {
-      Iterator<Record> expected = iterable.iterator();
       Iterator<RowData> rows = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(writeSchema);
-      for (int i = 0; i < NUM_RECORDS; i += 1) {
+      int pos = 0;
+      for (Record record : iterable) {
         assertThat(rows).hasNext();
-        TestHelpers.assertRowData(writeSchema.asStruct(), rowType, 
expected.next(), rows.next());
+        TestHelpers.assertRowData(
+            writeSchema.asStruct(), rowType, record, rows.next(), 
ID_TO_CONSTANT, pos++);
       }
+
       assertThat(rows).isExhausted();
     }
   }

Reply via email to