yuxiqian commented on code in PR #4221:
URL: https://github.com/apache/flink-cdc/pull/4221#discussion_r2684613199


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java:
##########
@@ -49,7 +49,7 @@ class CanalJsonSerializationSchemaTest {
             TableId.tableId("default_namespace", "default_schema", "table1");
 
     @Test
-    void testSerialize() throws Exception {
+    public void testSerializeComplexTypes() throws Exception {

Review Comment:
   It would be nice to keep existing cases unchanged and add new IT cases in 
`KafkaDataSinkITCase`.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/ComplexTypesEdgeCasesTest.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json;
+
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for complex types edge cases in {@link TableSchemaInfo}. */
+class ComplexTypesEdgeCasesTest {
+
+    @Test
+    void testNestedArrays() {
+        // Test ARRAY<ARRAY<STRING>>
+        DataType innerArrayType = DataTypes.ARRAY(DataTypes.STRING());
+        DataType outerArrayType = DataTypes.ARRAY(innerArrayType);
+
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("nested_arr", outerArrayType)
+                        .primaryKey("id")
+                        .build();
+
+        TableSchemaInfo tableSchemaInfo =
+                new TableSchemaInfo(
+                        TableId.parse("test.test"), schema, null, 
ZoneId.systemDefault());
+
+        // Create nested array data: [["a", "b"], ["c", "d"]]
+        GenericArrayData innerArray1 =
+                new GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("a"), 
BinaryStringData.fromString("b")
+                        });
+        GenericArrayData innerArray2 =
+                new GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("c"), 
BinaryStringData.fromString("d")
+                        });
+        GenericArrayData outerArray = new GenericArrayData(new Object[] 
{innerArray1, innerArray2});
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {DataTypes.INT(), 
outerArrayType});
+        RecordData recordData = generator.generate(new Object[] {1, 
outerArray});
+
+        RowData result = tableSchemaInfo.getRowDataFromRecordData(recordData, 
false);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getInt(0)).isEqualTo(1);
+
+        ArrayData resultOuterArray = result.getArray(1);
+        assertThat(resultOuterArray.size()).isEqualTo(2);
+
+        ArrayData resultInnerArray1 = resultOuterArray.getArray(0);
+        assertThat(resultInnerArray1.size()).isEqualTo(2);
+        assertThat(resultInnerArray1.getString(0).toString()).isEqualTo("a");
+        assertThat(resultInnerArray1.getString(1).toString()).isEqualTo("b");
+
+        ArrayData resultInnerArray2 = resultOuterArray.getArray(1);
+        assertThat(resultInnerArray2.size()).isEqualTo(2);
+        assertThat(resultInnerArray2.getString(0).toString()).isEqualTo("c");
+        assertThat(resultInnerArray2.getString(1).toString()).isEqualTo("d");

Review Comment:
   It's not straightforward to validate results like this. What about put 
complex type serialization tests in `KafkaDataSinkITCase` where we can validate 
result JSON directly?
   
   Also, a parameterized test is welcome so we don't have to repeat similar 
codes.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java:
##########
@@ -195,4 +224,231 @@ public Schema getSchema() {
     public SerializationSchema<RowData> getSerializationSchema() {
         return serializationSchema;
     }
+
+    /**
+     * Convert CDC ArrayData to Flink Table ArrayData.
+     *
+     * @param arrayData CDC array data
+     * @param arrayType array type information
+     * @param zoneId time zone for temporal type conversion
+     * @return Flink Table ArrayData
+     */
+    private static org.apache.flink.table.data.ArrayData convertArrayData(
+            ArrayData arrayData, ArrayType arrayType, ZoneId zoneId) {
+        if (arrayData == null) {
+            return null;
+        }
+
+        DataType elementType = arrayType.getElementType();
+        int size = arrayData.size();
+        Object[] result = new Object[size];
+
+        for (int i = 0; i < size; i++) {
+            result[i] = convertElement(arrayData, i, elementType, zoneId);
+        }
+
+        return new org.apache.flink.table.data.GenericArrayData(result);
+    }

Review Comment:
   nit: move CDC RecordData => Flink SQL converter to utils package?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to