This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2c9b155c [flink-cdc] Preserve Avro field documentation as Paimon 
column comments (#6505)
ca2c9b155c is described below

commit ca2c9b155c4e144113155013d74c589bc75eeee4
Author: siadat <[email protected]>
AuthorDate: Sat Nov 1 04:57:36 2025 +0000

    [flink-cdc] Preserve Avro field documentation as Paimon column comments 
(#6505)
---
 .../format/debezium/DebeziumAvroRecordParser.java  |   2 +-
 .../DebeziumAvroRecordParserTestSimple.java        | 122 +++++++++++++++++++++
 2 files changed, 123 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
index 7c3763a604..7219aab3b6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
@@ -155,7 +155,7 @@ public class DebeziumAvroRecordParser extends 
AbstractRecordParser {
                             record.get(fieldName),
                             ZoneOffset.UTC);
             resultMap.put(fieldName, transformed);
-            schemaBuilder.column(fieldName, avroToPaimonDataType(schema));
+            schemaBuilder.column(fieldName, avroToPaimonDataType(schema), 
field.doc());
         }
 
         evalComputedColumns(resultMap, schemaBuilder);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParserTestSimple.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParserTestSimple.java
new file mode 100644
index 0000000000..a32808e945
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParserTestSimple.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Simple test for DebeziumAvroRecordParser field documentation preservation. 
*/
+public class DebeziumAvroRecordParserTestSimple {
+
+    @Test
+    public void testFieldDocumentationPreservation() throws Exception {
+        // Create complete Avro schema with field documentation
+        String schemaJson =
+                "{"
+                        + "\"type\":\"record\","
+                        + "\"name\":\"Envelope\","
+                        + "\"fields\":["
+                        + "  {\"name\":\"op\",\"type\":\"string\"},"
+                        + "  {\"name\":\"after\",\"type\":[\"null\",{"
+                        + "    
\"type\":\"record\",\"name\":\"UserRecord\",\"fields\":["
+                        + "      
{\"name\":\"id\",\"type\":\"int\",\"doc\":\"Primary key identifier\"},"
+                        + "      
{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Full name of the person\"},"
+                        + "      
{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null,\"doc\":\"Email
 address for contact\"},"
+                        + "      {\"name\":\"city\",\"type\":\"string\"}"
+                        + "    ]"
+                        + "  }],\"default\":null},"
+                        + "  
{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"fields\":["
+                        + "    {\"name\":\"db\",\"type\":\"string\"},"
+                        + "    {\"name\":\"table\",\"type\":\"string\"}"
+                        + "  ]}}"
+                        + "]}";
+
+        org.apache.avro.Schema envelopeSchema =
+                new org.apache.avro.Schema.Parser().parse(schemaJson);
+        org.apache.avro.Schema userRecordSchema =
+                envelopeSchema.getField("after").schema().getTypes().get(1);
+
+        // Create test record data
+        GenericRecord afterRecord = new GenericData.Record(userRecordSchema);
+        afterRecord.put("id", 123);
+        afterRecord.put("name", "John Doe");
+        afterRecord.put("email", "[email protected]");
+        afterRecord.put("city", "New York");
+
+        GenericRecord sourceRecord =
+                new 
GenericData.Record(envelopeSchema.getField("source").schema());
+        sourceRecord.put("db", "testdb");
+        sourceRecord.put("table", "users");
+
+        GenericRecord envelopeRecord = new GenericData.Record(envelopeSchema);
+        envelopeRecord.put("op", "c");
+        envelopeRecord.put("after", afterRecord);
+        envelopeRecord.put("source", sourceRecord);
+
+        // Create CdcSourceRecord and test parser
+        CdcSourceRecord cdcRecord = new CdcSourceRecord("test-topic", null, 
envelopeRecord);
+
+        DebeziumAvroRecordParser parser =
+                new DebeziumAvroRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+
+        Schema paimonSchema = parser.buildSchema(cdcRecord);
+
+        // Verify schema was built with field documentation
+        assertThat(paimonSchema).isNotNull();
+        List<DataField> fields = paimonSchema.fields();
+        assertThat(fields).hasSize(4);
+
+        // Field with documentation
+        DataField idField = findFieldByName(fields, "id");
+        assertThat(idField).isNotNull();
+        assertThat(idField.description()).isEqualTo("Primary key identifier");
+        assertThat(idField.type()).isEqualTo(DataTypes.INT());
+
+        DataField nameField = findFieldByName(fields, "name");
+        assertThat(nameField).isNotNull();
+        assertThat(nameField.description()).isEqualTo("Full name of the 
person");
+        assertThat(nameField.type()).isEqualTo(DataTypes.STRING());
+
+        DataField emailField = findFieldByName(fields, "email");
+        assertThat(emailField).isNotNull();
+        assertThat(emailField.description()).isEqualTo("Email address for 
contact");
+
+        // Field without documentation
+        DataField cityField = findFieldByName(fields, "city");
+        assertThat(cityField).isNotNull();
+        assertThat(cityField.description()).isNull();
+        assertThat(cityField.type()).isEqualTo(DataTypes.STRING());
+    }
+
+    private static DataField findFieldByName(List<DataField> fields, String 
name) {
+        return fields.stream().filter(field -> 
field.name().equals(name)).findFirst().orElse(null);
+    }
+}

Reply via email to