This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 96a59408b2 Flink: Backport: Test Parquet writer default handling via
core's DataTestBase (#15123) (#15125)
96a59408b2 is described below
commit 96a59408b271881a596f74697c05adb2dbc44094
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Jan 23 18:27:03 2026 +0100
Flink: Backport: Test Parquet writer default handling via core's
DataTestBase (#15123) (#15125)
---
.../iceberg/flink/data/TestFlinkParquetWriter.java | 64 +++++++++++-----------
.../iceberg/flink/data/TestFlinkParquetWriter.java | 64 +++++++++++-----------
2 files changed, 66 insertions(+), 62 deletions(-)
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
index d181d33514..1eaf539df4 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
@@ -25,9 +25,8 @@ import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DataTestBase;
import org.apache.iceberg.data.RandomGenericData;
@@ -59,63 +58,66 @@ public class TestFlinkParquetWriter extends DataTestBase {
return true;
}
- private void writeAndValidate(Iterable<RowData> iterable, Schema schema)
throws IOException {
- OutputFile outputFile = new InMemoryOutputFile();
+ @Override
+ protected boolean supportsDefaultValues() {
+ return true;
+ }
- LogicalType logicalType = FlinkSchemaUtil.convert(schema);
+ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema,
List<Record> data)
+ throws IOException {
+ OutputFile outputFile = new InMemoryOutputFile();
+ LogicalType logicalType = FlinkSchemaUtil.convert(writeSchema);
try (FileAppender<RowData> writer =
Parquet.write(outputFile)
- .schema(schema)
+ .schema(writeSchema)
.createWriterFunc(msgType ->
FlinkParquetWriters.buildWriter(logicalType, msgType))
.build()) {
- writer.addAll(iterable);
+ writer.addAll(RandomRowData.convert(writeSchema, data));
}
try (CloseableIterable<Record> reader =
Parquet.read(outputFile.toInputFile())
- .project(schema)
- .createReaderFunc(msgType ->
GenericParquetReaders.buildReader(schema, msgType))
+ .project(expectedSchema)
+ .createReaderFunc(
+ fileSchema ->
GenericParquetReaders.buildReader(expectedSchema, fileSchema))
.build()) {
- Iterator<RowData> expected = iterable.iterator();
Iterator<Record> actual = reader.iterator();
- LogicalType rowType = FlinkSchemaUtil.convert(schema);
- for (int i = 0; i < NUM_RECORDS; i += 1) {
+ RowType rowType = FlinkSchemaUtil.convert(expectedSchema);
+ for (Record expected : data) {
assertThat(actual).hasNext();
- TestHelpers.assertRowData(schema.asStruct(), rowType, actual.next(),
expected.next());
+ RowData actualRowData = RowDataConverter.convert(expectedSchema,
actual.next());
+ TestHelpers.assertRowData(expectedSchema.asStruct(), rowType,
expected, actualRowData);
}
+
assertThat(actual).isExhausted();
}
}
@Override
protected void writeAndValidate(Schema schema) throws IOException {
- writeAndValidate(RandomRowData.generate(schema, NUM_RECORDS, 19981),
schema);
+ writeAndValidate(schema, RandomGenericData.generate(schema, NUM_RECORDS,
19981));
writeAndValidate(
- RandomRowData.convert(
- schema,
- RandomGenericData.generateDictionaryEncodableRecords(schema,
NUM_RECORDS, 21124)),
- schema);
+ schema,
+ Lists.newArrayList(
+ RandomGenericData.generateDictionaryEncodableRecords(schema,
NUM_RECORDS, 21124)));
writeAndValidate(
- RandomRowData.convert(
- schema,
+ schema,
+ Lists.newArrayList(
RandomGenericData.generateFallbackRecords(
- schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
- schema);
+ schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)));
}
@Override
- protected void writeAndValidate(Schema schema, List<Record> expectedData)
throws IOException {
- RowDataSerializer rowDataSerializer = new
RowDataSerializer(FlinkSchemaUtil.convert(schema));
- List<RowData> binaryRowList = Lists.newArrayList();
- for (Record record : expectedData) {
- RowData rowData = RowDataConverter.convert(schema, record);
- BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
- binaryRowList.add(binaryRow);
- }
+ protected void writeAndValidate(Schema schema, List<Record> data) throws
IOException {
+ writeAndValidate(schema, schema, data);
+ }
- writeAndValidate(binaryRowList, schema);
+ @Override
+ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema)
throws IOException {
+ List<Record> data = RandomGenericData.generate(writeSchema, NUM_RECORDS,
1991L);
+ writeAndValidate(writeSchema, expectedSchema, data);
}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
index d181d33514..1eaf539df4 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
@@ -25,9 +25,8 @@ import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DataTestBase;
import org.apache.iceberg.data.RandomGenericData;
@@ -59,63 +58,66 @@ public class TestFlinkParquetWriter extends DataTestBase {
return true;
}
- private void writeAndValidate(Iterable<RowData> iterable, Schema schema)
throws IOException {
- OutputFile outputFile = new InMemoryOutputFile();
+ @Override
+ protected boolean supportsDefaultValues() {
+ return true;
+ }
- LogicalType logicalType = FlinkSchemaUtil.convert(schema);
+ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema,
List<Record> data)
+ throws IOException {
+ OutputFile outputFile = new InMemoryOutputFile();
+ LogicalType logicalType = FlinkSchemaUtil.convert(writeSchema);
try (FileAppender<RowData> writer =
Parquet.write(outputFile)
- .schema(schema)
+ .schema(writeSchema)
.createWriterFunc(msgType ->
FlinkParquetWriters.buildWriter(logicalType, msgType))
.build()) {
- writer.addAll(iterable);
+ writer.addAll(RandomRowData.convert(writeSchema, data));
}
try (CloseableIterable<Record> reader =
Parquet.read(outputFile.toInputFile())
- .project(schema)
- .createReaderFunc(msgType ->
GenericParquetReaders.buildReader(schema, msgType))
+ .project(expectedSchema)
+ .createReaderFunc(
+ fileSchema ->
GenericParquetReaders.buildReader(expectedSchema, fileSchema))
.build()) {
- Iterator<RowData> expected = iterable.iterator();
Iterator<Record> actual = reader.iterator();
- LogicalType rowType = FlinkSchemaUtil.convert(schema);
- for (int i = 0; i < NUM_RECORDS; i += 1) {
+ RowType rowType = FlinkSchemaUtil.convert(expectedSchema);
+ for (Record expected : data) {
assertThat(actual).hasNext();
- TestHelpers.assertRowData(schema.asStruct(), rowType, actual.next(),
expected.next());
+ RowData actualRowData = RowDataConverter.convert(expectedSchema,
actual.next());
+ TestHelpers.assertRowData(expectedSchema.asStruct(), rowType,
expected, actualRowData);
}
+
assertThat(actual).isExhausted();
}
}
@Override
protected void writeAndValidate(Schema schema) throws IOException {
- writeAndValidate(RandomRowData.generate(schema, NUM_RECORDS, 19981),
schema);
+ writeAndValidate(schema, RandomGenericData.generate(schema, NUM_RECORDS,
19981));
writeAndValidate(
- RandomRowData.convert(
- schema,
- RandomGenericData.generateDictionaryEncodableRecords(schema,
NUM_RECORDS, 21124)),
- schema);
+ schema,
+ Lists.newArrayList(
+ RandomGenericData.generateDictionaryEncodableRecords(schema,
NUM_RECORDS, 21124)));
writeAndValidate(
- RandomRowData.convert(
- schema,
+ schema,
+ Lists.newArrayList(
RandomGenericData.generateFallbackRecords(
- schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
- schema);
+ schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)));
}
@Override
- protected void writeAndValidate(Schema schema, List<Record> expectedData)
throws IOException {
- RowDataSerializer rowDataSerializer = new
RowDataSerializer(FlinkSchemaUtil.convert(schema));
- List<RowData> binaryRowList = Lists.newArrayList();
- for (Record record : expectedData) {
- RowData rowData = RowDataConverter.convert(schema, record);
- BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
- binaryRowList.add(binaryRow);
- }
+ protected void writeAndValidate(Schema schema, List<Record> data) throws
IOException {
+ writeAndValidate(schema, schema, data);
+ }
- writeAndValidate(binaryRowList, schema);
+ @Override
+ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema)
throws IOException {
+ List<Record> data = RandomGenericData.generate(writeSchema, NUM_RECORDS,
1991L);
+ writeAndValidate(writeSchema, expectedSchema, data);
}
}