This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c58dca500d [FLINK-25962][avro] Use namespaces for generated records
2c58dca500d is described below

commit 2c58dca500d0ec4f5d80852aa96ddb9c06ae4d61
Author: Ryan Skraba <rskr...@apache.org>
AuthorDate: Tue Aug 2 14:53:31 2022 +0200

    [FLINK-25962][avro] Use namespaces for generated records
---
 .../util/kafka/SQLClientSchemaRegistryITCase.java  |  2 +-
 .../avro/typeutils/AvroSchemaConverter.java        |  4 +--
 .../flink/formats/avro/AvroBulkFormatTest.java     | 30 ++++++++++++++--------
 .../avro/typeutils/AvroSchemaConverterTest.java    |  3 +++
 4 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index cb4285bf961..77960650add 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -116,7 +116,7 @@ public class SQLClientSchemaRegistryITCase {
         String testResultsTopic = "test-results-" + 
UUID.randomUUID().toString();
         kafkaClient.createTopic(1, 1, testCategoryTopic);
         Schema categoryRecord =
-                SchemaBuilder.record("record")
+                SchemaBuilder.record("org.apache.flink.avro.generated.record")
                         .fields()
                         .requiredLong("category_id")
                         .optionalString("name")
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index d84ed069b9b..042a343af12 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -296,14 +296,14 @@ public class AvroSchemaConverter {
     /**
      * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
      *
-     * <p>Use "record" as the type name.
+     * <p>Use "org.apache.flink.avro.generated.record" as the type name.
      *
      * @param schema the schema type, usually it should be the top level 
record type, e.g. not a
      *     nested type
      * @return Avro's {@link Schema} matching this logical type.
      */
     public static Schema convertToSchema(LogicalType schema) {
-        return convertToSchema(schema, "record");
+        return convertToSchema(schema, 
"org.apache.flink.avro.generated.record");
     }
 
     /**
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
index 28798bb07a7..28361ce2426 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
@@ -59,7 +59,7 @@ class AvroBulkFormatTest {
 
     private static final List<RowData> TEST_DATA =
             Arrays.asList(
-                    // -------- batch 0, block start 186 --------
+                    // -------- batch 0, block start 232 --------
                     GenericRowData.of(
                             StringData.fromString("AvroBulk"), 
StringData.fromString("FormatTest")),
                     GenericRowData.of(
@@ -71,20 +71,20 @@ class AvroBulkFormatTest {
                                             + "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅"
                                             + "叙幽情。"),
                             StringData.fromString("")),
-                    // -------- batch 1, block start 547 --------
+                    // -------- batch 1, block start 593 --------
                     GenericRowData.of(
                             StringData.fromString("File"), 
StringData.fromString("Format")),
                     GenericRowData.of(
                             null,
                             StringData.fromString(
                                     "This is a string with English, 中文 and 
even 🍎🍌🍑🥝🍍🥭🍐")),
-                    // -------- batch 2, block start 659 --------
+                    // -------- batch 2, block start 705 --------
                     GenericRowData.of(
                             StringData.fromString("block with"),
                             StringData.fromString("only one record"))
-                    // -------- file length 706 --------
+                    // -------- file length 752 --------
                     );
-    private static final List<Integer> BLOCK_STARTS = Arrays.asList(186, 547, 
659);
+    private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L, 
705L);
 
     private File tmpFile;
 
@@ -101,13 +101,23 @@ class AvroBulkFormatTest {
         DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
         DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter);
         dataFileWriter.create(schema, out);
-        dataFileWriter.setSyncInterval(64);
-
-        for (RowData rowData : TEST_DATA) {
-            dataFileWriter.append((GenericRecord) converter.convert(schema, 
rowData));
-        }
 
+        //  Generate the sync points manually in order to test blocks.
+        long syncBlock1 = dataFileWriter.sync();
+        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(0)));
+        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(1)));
+        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(2)));
+        long syncBlock2 = dataFileWriter.sync();
+        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(3)));
+        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(4)));
+        long syncBlock3 = dataFileWriter.sync();
+        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(5)));
+        long syncEnd = dataFileWriter.sync();
         dataFileWriter.close();
+
+        // These values should be constant if nothing else changes with the 
file.
+        assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1, 
syncBlock2, syncBlock3));
+        assertThat(tmpFile).hasSize(syncEnd);
     }
 
     @AfterEach
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 7556d60a63c..df3731401da 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -190,6 +190,7 @@ class AvroSchemaConverterTest {
                         "{\n"
                                 + "  \"type\" : \"record\",\n"
                                 + "  \"name\" : \"record\",\n"
+                                + "  \"namespace\" : 
\"org.apache.flink.avro.generated\",\n"
                                 + "  \"fields\" : [ {\n"
                                 + "    \"name\" : \"row1\",\n"
                                 + "    \"type\" : [ \"null\", {\n"
@@ -326,6 +327,7 @@ class AvroSchemaConverterTest {
                 "{\n"
                         + "  \"type\" : \"record\",\n"
                         + "  \"name\" : \"record\",\n"
+                        + "  \"namespace\" : 
\"org.apache.flink.avro.generated\",\n"
                         + "  \"fields\" : [ {\n"
                         + "    \"name\" : \"f_null\",\n"
                         + "    \"type\" : \"null\",\n"
@@ -435,6 +437,7 @@ class AvroSchemaConverterTest {
                 "{\n"
                         + "  \"type\" : \"record\",\n"
                         + "  \"name\" : \"record\",\n"
+                        + "  \"namespace\" : 
\"org.apache.flink.avro.generated\",\n"
                         + "  \"fields\" : [ {\n"
                         + "    \"name\" : \"f_boolean\",\n"
                         + "    \"type\" : \"boolean\"\n"

Reply via email to