This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 96a59408b2 Flink: Backport: Test Parquet writer default handling via 
core's DataTestBase (#15123) (#15125)
96a59408b2 is described below

commit 96a59408b271881a596f74697c05adb2dbc44094
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Jan 23 18:27:03 2026 +0100

    Flink: Backport: Test Parquet writer default handling via core's 
DataTestBase (#15123) (#15125)
---
 .../iceberg/flink/data/TestFlinkParquetWriter.java | 64 +++++++++++-----------
 .../iceberg/flink/data/TestFlinkParquetWriter.java | 64 +++++++++++-----------
 2 files changed, 66 insertions(+), 62 deletions(-)

diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
index d181d33514..1eaf539df4 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
@@ -25,9 +25,8 @@ import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.DataTestBase;
 import org.apache.iceberg.data.RandomGenericData;
@@ -59,63 +58,66 @@ public class TestFlinkParquetWriter extends DataTestBase {
     return true;
   }
 
-  private void writeAndValidate(Iterable<RowData> iterable, Schema schema) 
throws IOException {
-    OutputFile outputFile = new InMemoryOutputFile();
+  @Override
+  protected boolean supportsDefaultValues() {
+    return true;
+  }
 
-    LogicalType logicalType = FlinkSchemaUtil.convert(schema);
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, 
List<Record> data)
+      throws IOException {
+    OutputFile outputFile = new InMemoryOutputFile();
 
+    LogicalType logicalType = FlinkSchemaUtil.convert(writeSchema);
     try (FileAppender<RowData> writer =
         Parquet.write(outputFile)
-            .schema(schema)
+            .schema(writeSchema)
             .createWriterFunc(msgType -> 
FlinkParquetWriters.buildWriter(logicalType, msgType))
             .build()) {
-      writer.addAll(iterable);
+      writer.addAll(RandomRowData.convert(writeSchema, data));
     }
 
     try (CloseableIterable<Record> reader =
         Parquet.read(outputFile.toInputFile())
-            .project(schema)
-            .createReaderFunc(msgType -> 
GenericParquetReaders.buildReader(schema, msgType))
+            .project(expectedSchema)
+            .createReaderFunc(
+                fileSchema -> 
GenericParquetReaders.buildReader(expectedSchema, fileSchema))
             .build()) {
-      Iterator<RowData> expected = iterable.iterator();
       Iterator<Record> actual = reader.iterator();
-      LogicalType rowType = FlinkSchemaUtil.convert(schema);
-      for (int i = 0; i < NUM_RECORDS; i += 1) {
+      RowType rowType = FlinkSchemaUtil.convert(expectedSchema);
+      for (Record expected : data) {
         assertThat(actual).hasNext();
-        TestHelpers.assertRowData(schema.asStruct(), rowType, actual.next(), 
expected.next());
+        RowData actualRowData = RowDataConverter.convert(expectedSchema, 
actual.next());
+        TestHelpers.assertRowData(expectedSchema.asStruct(), rowType, 
expected, actualRowData);
       }
+
       assertThat(actual).isExhausted();
     }
   }
 
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
-    writeAndValidate(RandomRowData.generate(schema, NUM_RECORDS, 19981), 
schema);
+    writeAndValidate(schema, RandomGenericData.generate(schema, NUM_RECORDS, 
19981));
 
     writeAndValidate(
-        RandomRowData.convert(
-            schema,
-            RandomGenericData.generateDictionaryEncodableRecords(schema, 
NUM_RECORDS, 21124)),
-        schema);
+        schema,
+        Lists.newArrayList(
+            RandomGenericData.generateDictionaryEncodableRecords(schema, 
NUM_RECORDS, 21124)));
 
     writeAndValidate(
-        RandomRowData.convert(
-            schema,
+        schema,
+        Lists.newArrayList(
             RandomGenericData.generateFallbackRecords(
-                schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
-        schema);
+                schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)));
   }
 
   @Override
-  protected void writeAndValidate(Schema schema, List<Record> expectedData) 
throws IOException {
-    RowDataSerializer rowDataSerializer = new 
RowDataSerializer(FlinkSchemaUtil.convert(schema));
-    List<RowData> binaryRowList = Lists.newArrayList();
-    for (Record record : expectedData) {
-      RowData rowData = RowDataConverter.convert(schema, record);
-      BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
-      binaryRowList.add(binaryRow);
-    }
+  protected void writeAndValidate(Schema schema, List<Record> data) throws 
IOException {
+    writeAndValidate(schema, schema, data);
+  }
 
-    writeAndValidate(binaryRowList, schema);
+  @Override
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
+    List<Record> data = RandomGenericData.generate(writeSchema, NUM_RECORDS, 
1991L);
+    writeAndValidate(writeSchema, expectedSchema, data);
   }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
index d181d33514..1eaf539df4 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
@@ -25,9 +25,8 @@ import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.DataTestBase;
 import org.apache.iceberg.data.RandomGenericData;
@@ -59,63 +58,66 @@ public class TestFlinkParquetWriter extends DataTestBase {
     return true;
   }
 
-  private void writeAndValidate(Iterable<RowData> iterable, Schema schema) 
throws IOException {
-    OutputFile outputFile = new InMemoryOutputFile();
+  @Override
+  protected boolean supportsDefaultValues() {
+    return true;
+  }
 
-    LogicalType logicalType = FlinkSchemaUtil.convert(schema);
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, 
List<Record> data)
+      throws IOException {
+    OutputFile outputFile = new InMemoryOutputFile();
 
+    LogicalType logicalType = FlinkSchemaUtil.convert(writeSchema);
     try (FileAppender<RowData> writer =
         Parquet.write(outputFile)
-            .schema(schema)
+            .schema(writeSchema)
             .createWriterFunc(msgType -> 
FlinkParquetWriters.buildWriter(logicalType, msgType))
             .build()) {
-      writer.addAll(iterable);
+      writer.addAll(RandomRowData.convert(writeSchema, data));
     }
 
     try (CloseableIterable<Record> reader =
         Parquet.read(outputFile.toInputFile())
-            .project(schema)
-            .createReaderFunc(msgType -> 
GenericParquetReaders.buildReader(schema, msgType))
+            .project(expectedSchema)
+            .createReaderFunc(
+                fileSchema -> 
GenericParquetReaders.buildReader(expectedSchema, fileSchema))
             .build()) {
-      Iterator<RowData> expected = iterable.iterator();
       Iterator<Record> actual = reader.iterator();
-      LogicalType rowType = FlinkSchemaUtil.convert(schema);
-      for (int i = 0; i < NUM_RECORDS; i += 1) {
+      RowType rowType = FlinkSchemaUtil.convert(expectedSchema);
+      for (Record expected : data) {
         assertThat(actual).hasNext();
-        TestHelpers.assertRowData(schema.asStruct(), rowType, actual.next(), 
expected.next());
+        RowData actualRowData = RowDataConverter.convert(expectedSchema, 
actual.next());
+        TestHelpers.assertRowData(expectedSchema.asStruct(), rowType, 
expected, actualRowData);
       }
+
       assertThat(actual).isExhausted();
     }
   }
 
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
-    writeAndValidate(RandomRowData.generate(schema, NUM_RECORDS, 19981), 
schema);
+    writeAndValidate(schema, RandomGenericData.generate(schema, NUM_RECORDS, 
19981));
 
     writeAndValidate(
-        RandomRowData.convert(
-            schema,
-            RandomGenericData.generateDictionaryEncodableRecords(schema, 
NUM_RECORDS, 21124)),
-        schema);
+        schema,
+        Lists.newArrayList(
+            RandomGenericData.generateDictionaryEncodableRecords(schema, 
NUM_RECORDS, 21124)));
 
     writeAndValidate(
-        RandomRowData.convert(
-            schema,
+        schema,
+        Lists.newArrayList(
             RandomGenericData.generateFallbackRecords(
-                schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
-        schema);
+                schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)));
   }
 
   @Override
-  protected void writeAndValidate(Schema schema, List<Record> expectedData) 
throws IOException {
-    RowDataSerializer rowDataSerializer = new 
RowDataSerializer(FlinkSchemaUtil.convert(schema));
-    List<RowData> binaryRowList = Lists.newArrayList();
-    for (Record record : expectedData) {
-      RowData rowData = RowDataConverter.convert(schema, record);
-      BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
-      binaryRowList.add(binaryRow);
-    }
+  protected void writeAndValidate(Schema schema, List<Record> data) throws 
IOException {
+    writeAndValidate(schema, schema, data);
+  }
 
-    writeAndValidate(binaryRowList, schema);
+  @Override
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
+    List<Record> data = RandomGenericData.generate(writeSchema, NUM_RECORDS, 
1991L);
+    writeAndValidate(writeSchema, expectedSchema, data);
   }
 }

Reply via email to