This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new d3be0d0154a [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19705) d3be0d0154a is described below commit d3be0d0154aad2a870448c49ce764a7f49d8bb7e Author: Haizhou Zhao <zhaohaizhou940...@gmail.com> AuthorDate: Wed May 11 17:41:32 2022 -0700 [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19705) Co-authored-by: Haizhou Zhao <haizhou_z...@apple.com> --- .../avro/typeutils/GenericRecordAvroTypeInfo.java | 10 ++++- .../avro/typeutils/SerializableAvroSchema.java | 11 +++-- .../typeutils/AvroGenericRecordTypeInfoTest.java | 50 ++++++++++++++++++++++ .../typeutils/AvroSerializerGenericRecordTest.java | 7 +-- ...a => AvroSerializerLargeGenericRecordTest.java} | 11 ++--- .../flink/formats/avro/utils/AvroTestUtils.java | 30 +++++++++++++ 6 files changed, 102 insertions(+), 17 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java index 387014077d2..28332db5a46 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java @@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -104,10 +105,15 @@ public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> { } private void writeObject(ObjectOutputStream oos) throws IOException { - oos.writeUTF(schema.toString()); + byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8); + oos.writeInt(schemaStrInBytes.length); + oos.write(schemaStrInBytes); } private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { - this.schema = new Schema.Parser().parse(ois.readUTF()); + int len = ois.readInt(); + byte[] content = new byte[len]; + ois.readFully(content); + this.schema = new Schema.Parser().parse(new String(content, StandardCharsets.UTF_8)); } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java index 32714951a61..12458752428 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.nio.charset.StandardCharsets; /** A wrapper for Avro {@link Schema}, that is Java serializable. */ @Internal @@ -52,14 +53,18 @@ final class SerializableAvroSchema implements Serializable { oos.writeBoolean(false); } else { oos.writeBoolean(true); - oos.writeUTF(schema.toString(false)); + byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8); + oos.writeInt(schemaStrInBytes.length); + oos.write(schemaStrInBytes); } } private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { if (ois.readBoolean()) { - String schema = ois.readUTF(); - this.schema = new Parser().parse(schema); + int len = ois.readInt(); + byte[] content = new byte[len]; + ois.readFully(content); + this.schema = new Parser().parse(new String(content, StandardCharsets.UTF_8)); } else { this.schema = null; } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java new file mode 100644 index 00000000000..b91053f60c8 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.formats.avro.utils.AvroTestUtils; + +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.Assert.assertTrue; + +/** Test for {@link GenericRecordAvroTypeInfo}. */ +public class AvroGenericRecordTypeInfoTest + extends TypeInformationTestBase<GenericRecordAvroTypeInfo> { + + @Override + protected GenericRecordAvroTypeInfo[] getTestData() { + return new GenericRecordAvroTypeInfo[] { + new GenericRecordAvroTypeInfo(AvroTestUtils.getSmallSchema()), + new GenericRecordAvroTypeInfo(AvroTestUtils.getLargeSchema()) + }; + } + + @Test + void testAvroByDefault() { + final TypeSerializer<GenericRecord> serializer = + new GenericRecordAvroTypeInfo(AvroTestUtils.getLargeSchema()) + .createSerializer(new ExecutionConfig()); + assertTrue(serializer instanceof AvroSerializer); + } +} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java index 750bae9df2d..26471e75d4f 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java @@ -20,6 +20,7 @@ package org.apache.flink.formats.avro.typeutils; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.formats.avro.utils.AvroTestUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -28,11 +29,7 @@ import org.apache.avro.generic.GenericRecordBuilder; /** Test for {@link AvroSerializer} that tests GenericRecord. */ public class AvroSerializerGenericRecordTest extends SerializerTestBase<GenericRecord> { - private static final Schema SCHEMA = - new org.apache.avro.Schema.Parser() - .parse( - "{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\": " - + "[{\"name\":\"afield\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); + private static final Schema SCHEMA = AvroTestUtils.getSmallSchema(); @Override protected TypeSerializer<GenericRecord> createSerializer() { diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerLargeGenericRecordTest.java similarity index 76% copy from flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java copy to flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerLargeGenericRecordTest.java index 750bae9df2d..29092e810ed 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerLargeGenericRecordTest.java @@ -20,19 +20,16 @@ package org.apache.flink.formats.avro.typeutils; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.formats.avro.utils.AvroTestUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; /** Test for {@link AvroSerializer} that tests GenericRecord. */ -public class AvroSerializerGenericRecordTest extends SerializerTestBase<GenericRecord> { +public class AvroSerializerLargeGenericRecordTest extends SerializerTestBase<GenericRecord> { - private static final Schema SCHEMA = - new org.apache.avro.Schema.Parser() - .parse( - "{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\": " - + "[{\"name\":\"afield\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); + private static final Schema SCHEMA = AvroTestUtils.getLargeSchema(); @Override protected TypeSerializer<GenericRecord> createSerializer() { @@ -52,7 +49,7 @@ public class AvroSerializerGenericRecordTest extends SerializerTestBase<GenericR @Override protected GenericRecord[] getTestData() { return new GenericRecord[] { - new GenericRecordBuilder(SCHEMA).set("afield", "foo bar").build() + new GenericRecordBuilder(SCHEMA).set("field1", "foo bar").build() }; } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java index aedcaa66a2f..c4e49e0980a 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java @@ -24,9 +24,11 @@ import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.typeutils.AvroSerializerLargeGenericRecordTest; import org.apache.flink.types.Row; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -259,6 +261,34 @@ public final class AvroTestUtils { return t; } + /** + * Craft a large Avro Schema which contains more than 0xFFFF characters. + * + * <p>0xFFFF is the magical number that once a java string length is above it, then the + * serialization scheme changes + */ + public static Schema getLargeSchema() { + SchemaBuilder.FieldAssembler<Schema> fields = + SchemaBuilder.record("LargeAvroSchema") + .namespace(AvroSerializerLargeGenericRecordTest.class.getName()) + .fields(); + for (int i = 0; i < 10000; ++i) { + fields = fields.optionalString("field" + i); + } + Schema schema = fields.endRecord(); + + assert schema.toString().length() > 0xFFFF; + return schema; + } + + /** Craft a small Avro Schema which contains less than 0xFFFF characters. */ + public static Schema getSmallSchema() { + return new org.apache.avro.Schema.Parser() + .parse( + "{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\": " + + "[{\"name\":\"afield\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); + } + /** * Writes given record using specified schema. *