This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f5040b7  [feature] Support importing MySQL geographic data types and 
PostgreSQL array data types (#25)
f5040b7 is described below

commit f5040b77f0013333cfc12ad9aa82235454df8852
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Thu Jun 6 16:06:03 2024 +0800

    [feature] Support importing MySQL geographic data types and PostgreSQL 
array data types (#25)
---
 pom.xml                                            |  12 +++
 .../kafka/connector/converter/RecordService.java   |   2 +-
 .../connector/converter/RecordTypeRegister.java    |   8 ++
 .../converter/type/AbstractGeometryType.java       |  30 ++++++
 .../doris/kafka/connector/converter/type/Type.java |   4 +
 .../converter/type/debezium/ArrayType.java         | 107 +++++++++++++++++++++
 .../converter/type/debezium/GeographyType.java     |  32 ++++++
 .../converter/type/debezium/GeometryType.java      |  47 +++++++++
 .../converter/type/debezium/PointType.java         |  46 +++++++++
 .../connector/converter/type/util/GeoUtils.java    |  67 +++++++++++++
 .../connector/converter/TestRecordService.java     |  62 +++++++++++-
 11 files changed, 414 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index ddbeb2f..29005cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,6 +90,7 @@
         <jackson.version>2.13.2.1</jackson.version>
         <httpcomponents.version>4.5.13</httpcomponents.version>
         <commons-io.version>2.3</commons-io.version>
+        <geometry.version>2.2.0</geometry.version>
     </properties>
 
     <repositories>
@@ -282,6 +283,17 @@
             <artifactId>debezium-core</artifactId>
             <version>${debezium.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.esri.geometry</groupId>
+            <artifactId>esri-geometry-api</artifactId>
+            <version>${geometry.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index cd76c42..7c75139 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -278,7 +278,7 @@ public class RecordService {
                     field.getSchema().isOptional()
                             ? source.getWithoutDefault(fieldName)
                             : source.get(fieldName);
-            Object convertValue = type.getValue(value);
+            Object convertValue = type.getValue(value, field.getSchema());
             if (Objects.nonNull(convertValue) && !type.isNumber()) {
                 filedMapping.put(fieldName, convertValue.toString());
             } else {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
index e78909d..5cf1001 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
@@ -36,11 +36,15 @@ import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectMapToConne
 import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectStringType;
 import org.apache.doris.kafka.connector.converter.type.connect.ConnectTimeType;
 import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.ArrayType;
 import org.apache.doris.kafka.connector.converter.type.debezium.DateType;
+import org.apache.doris.kafka.connector.converter.type.debezium.GeographyType;
+import org.apache.doris.kafka.connector.converter.type.debezium.GeometryType;
 import org.apache.doris.kafka.connector.converter.type.debezium.MicroTimeType;
 import 
org.apache.doris.kafka.connector.converter.type.debezium.MicroTimestampType;
 import org.apache.doris.kafka.connector.converter.type.debezium.NanoTimeType;
 import 
org.apache.doris.kafka.connector.converter.type.debezium.NanoTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.PointType;
 import org.apache.doris.kafka.connector.converter.type.debezium.TimeType;
 import org.apache.doris.kafka.connector.converter.type.debezium.TimestampType;
 import 
org.apache.doris.kafka.connector.converter.type.debezium.VariableScaleDecimalType;
@@ -73,6 +77,10 @@ public class RecordTypeRegister {
         registerType(ZonedTimeType.INSTANCE);
         registerType(ZonedTimestampType.INSTANCE);
         registerType(VariableScaleDecimalType.INSTANCE);
+        registerType(PointType.INSTANCE);
+        registerType(GeographyType.INSTANCE);
+        registerType(GeometryType.INSTANCE);
+        registerType(ArrayType.INSTANCE);
 
         // Supported connect data types
         registerType(ConnectBooleanType.INSTANCE);
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractGeometryType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractGeometryType.java
new file mode 100644
index 0000000..e88fbaf
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractGeometryType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.type;
+
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
+public abstract class AbstractGeometryType extends AbstractType {
+    @Override
+    public String getTypeName(Schema schema) {
+        return DorisType.STRING;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
index 698e838..6be040a 100644
--- a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
+++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
@@ -43,6 +43,10 @@ public interface Type {
     /** Get the actual converted value based on the column type. */
     Object getValue(Object sourceValue);
 
+    default Object getValue(Object sourcevalue, Schema schema) {
+        return getValue(sourcevalue);
+    }
+
     String getTypeName(Schema schema);
 
     boolean isNumber();
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ArrayType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ArrayType.java
new file mode 100644
index 0000000..870a346
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ArrayType.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.RecordTypeRegister;
+import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.doris.kafka.connector.converter.type.Type;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.kafka.connect.data.Schema;
+
+public class ArrayType extends AbstractType {
+    private static final String ARRAY_TYPE_TEMPLATE = "%s<%s>";
+    public static final ArrayType INSTANCE = new ArrayType();
+    private DorisOptions dorisOptions;
+    private RecordTypeRegister recordTypeRegister;
+
+    @Override
+    public void configure(DorisOptions dorisOptions) {
+        if (this.dorisOptions == null && this.recordTypeRegister == null) {
+            this.dorisOptions = dorisOptions;
+            registerNestedArrayType();
+        }
+    }
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"ARRAY"};
+    }
+
+    @Override
+    public String getTypeName(Schema schema) {
+        if (schema.valueSchema().isOptional()) {
+            Schema valueSchema = schema.valueSchema();
+            String type =
+                    Objects.nonNull(valueSchema.name())
+                            ? valueSchema.name()
+                            : valueSchema.type().name();
+            if (recordTypeRegister == null) {
+                registerNestedArrayType();
+            }
+            Type valueType = recordTypeRegister.getTypeRegistry().get(type);
+            if (valueType == null) {
+                return DorisType.STRING;
+            }
+            String typeName = valueType.getTypeName(schema);
+            return String.format(ARRAY_TYPE_TEMPLATE, DorisType.ARRAY, 
typeName);
+        }
+        return DorisType.STRING;
+    }
+
+    @Override
+    public Object getValue(Object sourceValue, Schema schema) {
+
+        if (sourceValue == null) {
+            return null;
+        }
+        Schema valueSchema = schema.valueSchema();
+        String type =
+                Objects.nonNull(valueSchema.name())
+                        ? valueSchema.name()
+                        : valueSchema.type().name();
+
+        if (sourceValue instanceof List) {
+            List<Object> resultList = new ArrayList<>();
+            ArrayList<?> convertedValue = (ArrayList<?>) sourceValue;
+            if (recordTypeRegister == null) {
+                registerNestedArrayType();
+            }
+            Type valueType = recordTypeRegister.getTypeRegistry().get(type);
+            if (valueType == null) {
+                return sourceValue;
+            }
+
+            for (Object value : convertedValue) {
+                resultList.add(valueType.getValue(value, valueSchema));
+            }
+            return resultList;
+        }
+
+        return sourceValue;
+    }
+
+    private void registerNestedArrayType() {
+        this.recordTypeRegister = new RecordTypeRegister(dorisOptions);
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/GeographyType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/GeographyType.java
new file mode 100644
index 0000000..85b8881
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/GeographyType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.data.geometry.Geography;
+import org.apache.doris.kafka.connector.converter.type.AbstractGeometryType;
+
+public class GeographyType extends AbstractGeometryType {
+    public static final GeographyType INSTANCE = new GeographyType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Geography.LOGICAL_NAME};
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/GeometryType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/GeometryType.java
new file mode 100644
index 0000000..c6e153b
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/GeometryType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.data.geometry.Geometry;
+import org.apache.doris.kafka.connector.converter.type.AbstractGeometryType;
+import org.apache.doris.kafka.connector.converter.type.util.GeoUtils;
+import org.apache.kafka.connect.data.Struct;
+
+public class GeometryType extends AbstractGeometryType {
+    public static final GeometryType INSTANCE = new GeometryType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Geometry.LOGICAL_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+
+        if (sourceValue instanceof Struct) {
+            return GeoUtils.handleGeoStructData(sourceValue);
+        }
+
+        return sourceValue;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/PointType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/PointType.java
new file mode 100644
index 0000000..da01507
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/PointType.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.data.geometry.Point;
+import org.apache.doris.kafka.connector.converter.type.AbstractGeometryType;
+import org.apache.doris.kafka.connector.converter.type.util.GeoUtils;
+import org.apache.kafka.connect.data.Struct;
+
+public class PointType extends AbstractGeometryType {
+    public static final PointType INSTANCE = new PointType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Point.LOGICAL_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof Struct) {
+            return GeoUtils.handleGeoStructData(sourceValue);
+        }
+
+        return sourceValue;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/util/GeoUtils.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/util/GeoUtils.java
new file mode 100644
index 0000000..9c0fc65
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/util/GeoUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.type.util;
+
+import com.esri.core.geometry.ogc.OGCGeometry;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GeoUtils {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(GeoUtils.class);
+
+    private GeoUtils() {}
+
+    public static Object handleGeoStructData(Object sourceValue) {
+        // the Geometry datatype in MySQL will be converted to
+        // a String with Json format
+        final ObjectMapper objectMapper = new ObjectMapper();
+        Struct geometryStruct = (Struct) sourceValue;
+
+        try {
+            byte[] wkb = geometryStruct.getBytes("wkb");
+            String geoJson = 
OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
+            JsonNode originGeoNode = objectMapper.readTree(geoJson);
+
+            Optional<Integer> srid = 
Optional.ofNullable(geometryStruct.getInt32("srid"));
+            Map<String, Object> geometryInfo = new HashMap<>();
+            String geometryType = originGeoNode.get("type").asText();
+
+            geometryInfo.put("type", geometryType);
+            if ("GeometryCollection".equals(geometryType)) {
+                geometryInfo.put("geometries", 
originGeoNode.get("geometries"));
+            } else {
+                geometryInfo.put("coordinates", 
originGeoNode.get("coordinates"));
+            }
+
+            geometryInfo.put("srid", srid.orElse(0));
+            return geometryInfo;
+        } catch (Exception e) {
+            LOGGER.warn("Failed to parse Geometry datatype, converting the 
value to null", e);
+            return null;
+        }
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 0142259..74af7c9 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -75,7 +75,7 @@ public class TestRecordService {
         props.put("debezium.schema.evolution", "basic");
         props.put(
                 "doris.topic2table.map",
-                
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal");
+                
"avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal,mysql_test.doris_test.geo_table:geo_table,pg_test.doris_cdc.all_array_types:all_array_types");
         recordService = new RecordService(new DorisOptions((Map) props));
         HashMap<String, String> config = new HashMap<>();
         jsonConverter.configure(config, false);
@@ -237,6 +237,65 @@ public class TestRecordService {
                 
"{\"name\":\"doris\",\"key\":\"1\"}\n{\"name\":\"doris\",\"key\":\"2\"}", s);
     }
 
+    @Test
+    public void processMySQlGeoStructRecord() throws IOException {
+        String schemaStr =
+                
"{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"geo_point\",\"aggregation_type\":\"REPLACE\",\"comment\":\"\",\"type\":\"STRING\"},{\"name\":\"geo_linestring\",\"aggregation_type\":\"REPLACE\",\"comment\":\"\",\"type\":\"STRING\"},{\"name\":\"geo_polygon\",\"aggregation_type\":\"REPLACE\",\"comment\":\"\",\"type\":\"STRING\"},{\"name\":\"geo_multipoint\",\"aggregation_type\":\"REPLACE
 [...]
+        Schema schema = null;
+        try {
+            schema = objectMapper.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException e) {
+            throw new DorisException(e);
+        }
+        mockRestService
+                .when(() -> RestService.getSchema(any(), any(), any(), any()))
+                .thenReturn(schema);
+
+        String topic = "mysql_test.doris_test.geo_table";
+
+        // no delete value
+        String noDeleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,
 [...]
+        String expectedNoDeleteValue =
+                "{\"id\":1,\"geo_point\":\"{coordinates=[1,1], type=Point, 
srid=0}\",\"geo_linestring\":\"{coordinates=[[0,0],[1,1],[2,2]], 
type=LineString, 
srid=0}\",\"geo_polygon\":\"{coordinates=[[[0,0],[1,0],[1,1],[0,0]]], 
type=Polygon, srid=0}\",\"geo_multipoint\":\"{coordinates=[[0,0],[1,1]], 
type=MultiPoint, 
srid=0}\",\"geo_multilinestring\":\"{coordinates=[[[0,0],[1,1]],[[2,2],[3,3]]], 
type=MultiLineString, 
srid=0}\",\"geo_multipolygon\":\"{coordinates=[[[[0,0],[1,0],[1,1],[0,0]] [...]
+        buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
+
+        // delete value
+        String deleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,
 [...]
+        String expectedDeleteValue =
+                "{\"id\":1,\"geo_point\":\"{coordinates=[1,1], type=Point, 
srid=0}\",\"geo_linestring\":\"{coordinates=[[0,0],[1,1],[2,2]], 
type=LineString, 
srid=0}\",\"geo_polygon\":\"{coordinates=[[[0,0],[1,0],[1,1],[0,0]]], 
type=Polygon, srid=0}\",\"geo_multipoint\":\"{coordinates=[[0,0],[1,1]], 
type=MultiPoint, 
srid=0}\",\"geo_multilinestring\":\"{coordinates=[[[0,0],[1,1]],[[2,2],[3,3]]], 
type=MultiLineString, 
srid=0}\",\"geo_multipolygon\":\"{coordinates=[[[[0,0],[1,0],[1,1],[0,0]] [...]
+        buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
+    }
+
+    @Test
+    public void processPostgreArrayStructRecord() throws IOException {
+        String schemaStr =
+                
"{\"keysType\":\"DUP_KEYS\",\"properties\":[{\"name\":\"int_array\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"smallint_array\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"ARRAY\"},{\"name\":\"bigint_array\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"ARRAY\"},{\"name\":\"float_array\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"ARRAY\"},{\"name\":\"double_array\",\"aggregation_type\":\"NONE\",\"comme
 [...]
+        Schema schema = null;
+        try {
+            schema = objectMapper.readValue(schemaStr, Schema.class);
+        } catch (JsonProcessingException e) {
+            throw new DorisException(e);
+        }
+        mockRestService
+                .when(() -> RestService.getSchema(any(), any(), any(), any()))
+                .thenReturn(schema);
+
+        String topic = "pg_test.doris_cdc.all_array_types";
+        String noDeleteArrayValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},{\"type\":\"array\",\"items\":{\"type\":\"int16\",\"optional\":true},\"optional\":true,\"field\":\"smallint_array\"},{\"type\":\"array\",\"items\":{\"type\":\"int64\",\"optional\":true},\"optional\":true,\"field\":\"bigint_array\"},{\"type\":\"array\",\"items\":{\"type\":\"float\",\"optional\":true},\"optional\":true,\"field\":\"float_arra
 [...]
+        String expectedNoDeleteArrayValue =
+                "{\"id\":1,\"smallint_array\":\"[100, 200, 
300]\",\"bigint_array\":\"[10000000000, 20000000000, 
30000000000]\",\"float_array\":\"[1.1, 2.2, 3.3]\",\"double_array\":\"[10.1, 
20.2, 30.3]\",\"numeric_array\":\"[123.45, 678.90]\",\"text_array\":\"[text1, 
text2, text3]\",\"varchar_array\":\"[varchar1, varchar2, 
varchar3]\",\"char_array\":\"[c, h, a]\",\"bool_array\":\"[true, false, 
true]\",\"date_array\":\"[2024-01-01, 
2024-01-02]\",\"timestamp_array\":\"[2022-01-01T10:00, 202 [...]
+        buildProcessStructRecord(topic, noDeleteArrayValue, 
expectedNoDeleteArrayValue);
+
+        String deletedArrayValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},{\"type\":\"array\",\"items\":{\"type\":\"int16\",\"optional\":true},\"optional\":true,\"field\":\"smallint_array\"},{\"type\":\"array\",\"items\":{\"type\":\"int64\",\"optional\":true},\"optional\":true,\"field\":\"bigint_array\"},{\"type\":\"array\",\"items\":{\"type\":\"float\",\"optional\":true},\"optional\":true,\"field\":\"float_arra
 [...]
+        String expectedDeleteArrayValue =
+                "{\"id\":1,\"smallint_array\":\"[100, 200, 
300]\",\"bigint_array\":\"[10000000000, 20000000000, 
30000000000]\",\"float_array\":\"[1.1, 2.2, 3.3]\",\"double_array\":\"[10.1, 
20.2, 30.3]\",\"numeric_array\":\"[123.45, 678.90]\",\"text_array\":\"[text1, 
text2, text3]\",\"varchar_array\":\"[varchar1, varchar2, 
varchar3]\",\"char_array\":\"[c, h, a]\",\"bool_array\":\"[true, false, 
true]\",\"date_array\":\"[2024-01-01, 
2024-01-02]\",\"timestamp_array\":\"[2022-01-01T10:00, 202 [...]
+        buildProcessStructRecord(topic, deletedArrayValue, 
expectedDeleteArrayValue);
+    }
+
     @Test
     public void processMapRecord() throws JsonProcessingException {
         Map<String, String> mapValue = new HashMap<>();
@@ -264,6 +323,5 @@ public class TestRecordService {
     @After
     public void close() {
         mockRestService.close();
-        ;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to