This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 2c58dca500d [FLINK-25962][avro] Use namespaces for generated records 2c58dca500d is described below commit 2c58dca500d0ec4f5d80852aa96ddb9c06ae4d61 Author: Ryan Skraba <rskr...@apache.org> AuthorDate: Tue Aug 2 14:53:31 2022 +0200 [FLINK-25962][avro] Use namespaces for generated records --- .../util/kafka/SQLClientSchemaRegistryITCase.java | 2 +- .../avro/typeutils/AvroSchemaConverter.java | 4 +-- .../flink/formats/avro/AvroBulkFormatTest.java | 30 ++++++++++++++-------- .../avro/typeutils/AvroSchemaConverterTest.java | 3 +++ 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java index cb4285bf961..77960650add 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java @@ -116,7 +116,7 @@ public class SQLClientSchemaRegistryITCase { String testResultsTopic = "test-results-" + UUID.randomUUID().toString(); kafkaClient.createTopic(1, 1, testCategoryTopic); Schema categoryRecord = - SchemaBuilder.record("record") + SchemaBuilder.record("org.apache.flink.avro.generated.record") .fields() .requiredLong("category_id") .optionalString("name") diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java index d84ed069b9b..042a343af12 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java @@ -296,14 +296,14 @@ public class AvroSchemaConverter { /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * - * <p>Use "record" as the type name. + * <p>Use "org.apache.flink.avro.generated.record" as the type name. * * @param schema the schema type, usually it should be the top level record type, e.g. not a * nested type * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType schema) { - return convertToSchema(schema, "record"); + return convertToSchema(schema, "org.apache.flink.avro.generated.record"); } /** diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java index 28798bb07a7..28361ce2426 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java @@ -59,7 +59,7 @@ class AvroBulkFormatTest { private static final List<RowData> TEST_DATA = Arrays.asList( - // -------- batch 0, block start 186 -------- + // -------- batch 0, block start 232 -------- GenericRowData.of( StringData.fromString("AvroBulk"), StringData.fromString("FormatTest")), GenericRowData.of( @@ -71,20 +71,20 @@ class AvroBulkFormatTest { + "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅" + "叙幽情。"), StringData.fromString("")), - // -------- batch 1, block start 547 -------- + // -------- batch 1, block start 593 -------- GenericRowData.of( StringData.fromString("File"), StringData.fromString("Format")), GenericRowData.of( null, StringData.fromString( "This is a string with English, 中文 and even 🍎🍌🍑🥝🍍🥭🍐")), - // -------- batch 2, block start 659 -------- + // -------- batch 2, block start 705 -------- GenericRowData.of( StringData.fromString("block with"), StringData.fromString("only one record")) - // -------- file length 706 -------- + // -------- file length 752 -------- ); - private static final List<Integer> BLOCK_STARTS = Arrays.asList(186, 547, 659); + private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L, 705L); private File tmpFile; @@ -101,13 +101,23 @@ class AvroBulkFormatTest { DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter); dataFileWriter.create(schema, out); - dataFileWriter.setSyncInterval(64); - - for (RowData rowData : TEST_DATA) { - dataFileWriter.append((GenericRecord) converter.convert(schema, rowData)); - } + // Generate the sync points manually in order to test blocks. + long syncBlock1 = dataFileWriter.sync(); + dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(0))); + dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(1))); + dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(2))); + long syncBlock2 = dataFileWriter.sync(); + dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(3))); + dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(4))); + long syncBlock3 = dataFileWriter.sync(); + dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(5))); + long syncEnd = dataFileWriter.sync(); dataFileWriter.close(); + + // These values should be constant if nothing else changes with the file. + assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1, syncBlock2, syncBlock3)); + assertThat(tmpFile).hasSize(syncEnd); } @AfterEach diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java index 7556d60a63c..df3731401da 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java @@ -190,6 +190,7 @@ class AvroSchemaConverterTest { "{\n" + " \"type\" : \"record\",\n" + " \"name\" : \"record\",\n" + + " \"namespace\" : \"org.apache.flink.avro.generated\",\n" + " \"fields\" : [ {\n" + " \"name\" : \"row1\",\n" + " \"type\" : [ \"null\", {\n" @@ -326,6 +327,7 @@ class AvroSchemaConverterTest { "{\n" + " \"type\" : \"record\",\n" + " \"name\" : \"record\",\n" + + " \"namespace\" : \"org.apache.flink.avro.generated\",\n" + " \"fields\" : [ {\n" + " \"name\" : \"f_null\",\n" + " \"type\" : \"null\",\n" @@ -435,6 +437,7 @@ class AvroSchemaConverterTest { "{\n" + " \"type\" : \"record\",\n" + " \"name\" : \"record\",\n" + + " \"namespace\" : \"org.apache.flink.avro.generated\",\n" + " \"fields\" : [ {\n" + " \"name\" : \"f_boolean\",\n" + " \"type\" : \"boolean\"\n"