This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f356087156 Flink: Test both "new" Flink Avro planned reader and
"deprecated" Avro reader (#11430)
f356087156 is described below
commit f3560871564c95a0e7b2bff3ca6ecb2e08726d01
Author: JB Onofré <[email protected]>
AuthorDate: Tue Nov 26 16:55:09 2024 +0100
Flink: Test both "new" Flink Avro planned reader and "deprecated" Avro
reader (#11430)
---
...java => AbstractTestFlinkAvroReaderWriter.java} | 11 ++--
.../data/TestFlinkAvroDeprecatedReaderWriter.java | 38 ++++++++++++++
.../data/TestFlinkAvroPlannedReaderWriter.java | 34 +++++++++++++
.../iceberg/flink/data/TestRowProjection.java | 58 +++++++++++++++-------
4 files changed, 116 insertions(+), 25 deletions(-)
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java
similarity index 96%
rename from
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
rename to
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java
index 2b9e8694b6..cbf49ae6fa 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java
@@ -48,7 +48,7 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.jupiter.api.Test;
-public class TestFlinkAvroReaderWriter extends DataTest {
+public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {
private static final int NUM_RECORDS = 100;
@@ -70,6 +70,8 @@ public class TestFlinkAvroReaderWriter extends DataTest {
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
}
+ protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile,
Schema schema);
+
private void writeAndValidate(Schema schema, List<Record> expectedRecords,
int numRecord)
throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
@@ -88,11 +90,7 @@ public class TestFlinkAvroReaderWriter extends DataTest {
writer.addAll(expectedRecords);
}
- try (CloseableIterable<RowData> reader =
- Avro.read(Files.localInput(recordsFile))
- .project(schema)
- .createResolvingReader(FlinkPlannedAvroReader::create)
- .build()) {
+ try (CloseableIterable<RowData> reader =
createAvroReadBuilder(recordsFile, schema).build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < numRecord; i++) {
@@ -156,7 +154,6 @@ public class TestFlinkAvroReaderWriter extends DataTest {
@Test
public void testNumericTypes() throws IOException {
-
List<Record> expected =
ImmutableList.of(
recordNumType(
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java
new file mode 100644
index 0000000000..03910f4fda
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+
+/**
+ * @deprecated should be removed in 1.8.0; along with FlinkAvroReader.
+ */
+@Deprecated
+public class TestFlinkAvroDeprecatedReaderWriter extends
AbstractTestFlinkAvroReaderWriter {
+
+ @Override
+ protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema
schema) {
+ return Avro.read(Files.localInput(recordsFile))
+ .project(schema)
+ .createReaderFunc(FlinkAvroReader::new);
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java
new file mode 100644
index 0000000000..102a26a947
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+
+public class TestFlinkAvroPlannedReaderWriter extends
AbstractTestFlinkAvroReaderWriter {
+
+ @Override
+ protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema
schema) {
+ return Avro.read(Files.localInput(recordsFile))
+ .project(schema)
+ .createResolvingReader(FlinkPlannedAvroReader::create);
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
index 3b6cf0c58f..f76e4c4942 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
@@ -24,6 +24,8 @@ import static org.assertj.core.api.Assertions.withPrecision;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
@@ -32,6 +34,9 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -41,13 +46,23 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestRowProjection {
@TempDir private Path temp;
+ @Parameter(index = 0)
+ protected Boolean useAvroPlannedReader;
+
+ @Parameters(name = "useAvroPlannedReader={0}")
+ protected static List<Object[]> parameters() {
+ return Arrays.asList(new Object[] {Boolean.FALSE}, new Object[]
{Boolean.TRUE});
+ }
+
private RowData writeAndRead(String desc, Schema writeSchema, Schema
readSchema, RowData row)
throws IOException {
File file = File.createTempFile("junit", desc + ".avro", temp.toFile());
@@ -61,16 +76,23 @@ public class TestRowProjection {
appender.add(row);
}
- Iterable<RowData> records =
+ Avro.ReadBuilder builder =
Avro.read(Files.localInput(file))
.project(readSchema)
- .createResolvingReader(FlinkPlannedAvroReader::create)
- .build();
+ .createReaderFunc(FlinkAvroReader::new);
+ if (useAvroPlannedReader) {
+ builder =
+ Avro.read(Files.localInput(file))
+ .project(readSchema)
+ .createResolvingReader(FlinkPlannedAvroReader::create);
+ }
+
+ Iterable<RowData> records = builder.build();
return Iterables.getOnlyElement(records);
}
- @Test
+ @TestTemplate
public void testFullProjection() throws Exception {
Schema schema =
new Schema(
@@ -85,7 +107,7 @@ public class TestRowProjection {
assertThat(projected.getString(1)).asString().isEqualTo("test");
}
- @Test
+ @TestTemplate
public void testSpecialCharacterProjection() throws Exception {
Schema schema =
new Schema(
@@ -105,7 +127,7 @@ public class TestRowProjection {
assertThat(projected.getString(0)).asString().isEqualTo("test");
}
- @Test
+ @TestTemplate
public void testReorderedFullProjection() throws Exception {
Schema schema =
new Schema(
@@ -125,7 +147,7 @@ public class TestRowProjection {
assertThat(projected.getLong(1)).isEqualTo(34);
}
- @Test
+ @TestTemplate
public void testReorderedProjection() throws Exception {
Schema schema =
new Schema(
@@ -147,7 +169,7 @@ public class TestRowProjection {
assertThat(projected.isNullAt(2)).isTrue();
}
- @Test
+ @TestTemplate
public void testRenamedAddedField() throws Exception {
Schema schema =
new Schema(
@@ -177,7 +199,7 @@ public class TestRowProjection {
assertThat(projected.isNullAt(3)).as("Should contain empty value on new
column 4").isTrue();
}
- @Test
+ @TestTemplate
public void testEmptyProjection() throws Exception {
Schema schema =
new Schema(
@@ -192,7 +214,7 @@ public class TestRowProjection {
assertThat(projected.getArity()).isEqualTo(0);
}
- @Test
+ @TestTemplate
public void testBasicProjection() throws Exception {
Schema writeSchema =
new Schema(
@@ -216,7 +238,7 @@ public class TestRowProjection {
assertThat(projected.getString(0)).asString().isEqualTo("test");
}
- @Test
+ @TestTemplate
public void testRename() throws Exception {
Schema writeSchema =
new Schema(
@@ -239,7 +261,7 @@ public class TestRowProjection {
.isEqualTo("test");
}
- @Test
+ @TestTemplate
public void testNestedStructProjection() throws Exception {
Schema writeSchema =
new Schema(
@@ -305,7 +327,7 @@ public class TestRowProjection {
.isEqualTo(-1.539054f, withPrecision(0.000001f));
}
- @Test
+ @TestTemplate
public void testMapProjection() throws IOException {
Schema writeSchema =
new Schema(
@@ -359,7 +381,7 @@ public class TestRowProjection {
return stringMap;
}
- @Test
+ @TestTemplate
public void testMapOfStructsProjection() throws IOException {
Schema writeSchema =
new Schema(
@@ -459,7 +481,7 @@ public class TestRowProjection {
.isEqualTo(52.995143f, withPrecision(0.000001f));
}
- @Test
+ @TestTemplate
public void testListProjection() throws IOException {
Schema writeSchema =
new Schema(
@@ -488,7 +510,7 @@ public class TestRowProjection {
assertThat(projected.getArray(0)).isEqualTo(values);
}
- @Test
+ @TestTemplate
@SuppressWarnings("unchecked")
public void testListOfStructsProjection() throws IOException {
Schema writeSchema =
@@ -565,7 +587,7 @@ public class TestRowProjection {
assertThat(projectedP2.isNullAt(0)).as("Should project null z").isTrue();
}
- @Test
+ @TestTemplate
public void testAddedFieldsWithRequiredChildren() throws Exception {
Schema schema = new Schema(Types.NestedField.required(1, "a",
Types.LongType.get()));