This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f07c6ff77 [Fix][Connector-v2][Doris] Array type data parsing failed 
(#10095)
4f07c6ff77 is described below

commit 4f07c6ff77013375b756084511f7cafd108f2c7a
Author: Jast <[email protected]>
AuthorDate: Thu Nov 27 19:14:32 2025 +0800

    [Fix][Connector-v2][Doris] Array type data parsing failed (#10095)
---
 .../arrow/reader/ArrowToSeatunnelRowReader.java    | 38 ++++++++++++++++++----
 .../arrow/ArrowToSeatunnelRowReaderTest.java       |  8 +++--
 .../seatunnel/e2e/connector/doris/DorisIT.java     | 34 +++++++++----------
 .../spark/serialization/InternalRowConverter.java  | 25 ++++++--------
 4 files changed, 64 insertions(+), 41 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
index 5f5b5fec17..f7821ed5fd 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.LocalDate;
@@ -262,8 +263,8 @@ public class ArrowToSeatunnelRowReader implements 
AutoCloseable {
 
     private Object convertMap(
             int rowIndex, Converter converter, FieldVector fieldVector, 
MapType mapType) {
-        SqlType keyType = mapType.getKeyType().getSqlType();
-        SqlType valueType = mapType.getValueType().getSqlType();
+        SeaTunnelDataType keyType = mapType.getKeyType();
+        SeaTunnelDataType valueType = mapType.getValueType();
         Map<String, Function> fieldConverters = new HashMap<>();
         fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType));
         fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType));
@@ -272,10 +273,20 @@ public class ArrowToSeatunnelRowReader implements 
AutoCloseable {
 
     private Object convertArray(
             int rowIndex, Converter converter, FieldVector fieldVector, 
ArrayType arrayType) {
-        SqlType elementType = arrayType.getElementType().getSqlType();
+        SeaTunnelDataType elementType = arrayType.getElementType();
         Map<String, Function> fieldConverters = new HashMap<>();
         fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType));
-        return converter.convert(rowIndex, fieldVector, fieldConverters);
+        Object convertedValue = converter.convert(rowIndex, fieldVector, 
fieldConverters);
+        if (convertedValue instanceof List) {
+            List<?> list = (List<?>) convertedValue;
+            Class<?> componentType = arrayType.getElementType().getTypeClass();
+            Object array = Array.newInstance(componentType, list.size());
+            for (int i = 0; i < list.size(); i++) {
+                Array.set(array, i, list.get(i));
+            }
+            return array;
+        }
+        return convertedValue;
     }
 
     private Object convertRow(
@@ -284,13 +295,26 @@ public class ArrowToSeatunnelRowReader implements 
AutoCloseable {
         List<SeaTunnelDataType<?>> fieldTypes = rowType.getChildren();
         Map<String, Function> fieldConverters = new HashMap<>();
         for (int i = 0; i < fieldTypes.size(); i++) {
-            fieldConverters.put(fieldNames[i], 
genericsConvert(fieldTypes.get(i).getSqlType()));
+            fieldConverters.put(fieldNames[i], 
genericsConvert(fieldTypes.get(i)));
         }
         return converter.convert(rowIndex, fieldVector, fieldConverters);
     }
 
-    private Function<Object, Object> genericsConvert(SqlType sqlType) {
-        return value -> convertSeatunnelRowValue(sqlType, null, value);
+    private Function<Object, Object> genericsConvert(SeaTunnelDataType 
dataType) {
+        return value -> {
+            if (dataType instanceof ArrayType) {
+                if (value instanceof List) {
+                    List<?> list = (List<?>) value;
+                    Class<?> componentType = ((ArrayType) 
dataType).getElementType().getTypeClass();
+                    Object array = Array.newInstance(componentType, 
list.size());
+                    for (int i = 0; i < list.size(); i++) {
+                        Array.set(array, i, list.get(i));
+                    }
+                    return array;
+                }
+            }
+            return convertSeatunnelRowValue(dataType.getSqlType(), null, 
value);
+        };
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
index 79a26743df..24e923c9ca 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
@@ -393,11 +393,15 @@ public class ArrowToSeatunnelRowReaderTest {
                     Collections.singletonList(localDateTime.toLocalDate()), 
actualDate2Data);
             // check array int
             List<Object> actualArrayIntData =
-                    rows.stream().map(s -> 
s.getField(16)).collect(Collectors.toList());
+                    rows.stream()
+                            .map(s -> Arrays.asList((Integer[]) 
s.getField(16)))
+                            .collect(Collectors.toList());
             Assertions.assertIterableEquals(arrayData1, actualArrayIntData);
             // check array timestamp
             List<Object> actualArrayTimestampData =
-                    rows.stream().map(s -> 
s.getField(17)).collect(Collectors.toList());
+                    rows.stream()
+                            .map(s -> Arrays.asList((LocalDateTime[]) 
s.getField(17)))
+                            .collect(Collectors.toList());
             Assertions.assertIterableEquals(arrayData2, 
actualArrayTimestampData);
             // check SECOND timestamp without timezone
             List<Object> actualTimestampSecData =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index d817c7743b..f62454a861 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -717,24 +717,24 @@ public class DorisIT extends AbstractDorisIT {
                                 GenerateTestData.genDateString(),
                                 GenerateTestData.genJsonString(),
                                 GenerateTestData.genJsonString(),
-                                (new boolean[] {true, true, false}).toString(),
-                                (new int[] {1, 2, 3}).toString(),
-                                (new int[] {1, 2, 3}).toString(),
-                                (new int[] {1, 2, 3}).toString(),
-                                (new long[] {1L, 2L, 3L}).toString(),
-                                (new float[] {1.0F, 1.0F, 1.0F}).toString(),
-                                (new double[] {1.0, 1.0, 1.0}).toString(),
-                                (new String[] {"1", "1"}).toString(),
-                                (new String[] {"1", "1"}).toString(),
-                                (new String[] {"1", "1"}).toString(),
-                                (new String[] {"1", "1"}).toString(),
-                                (new BigDecimal[] {
+                                Arrays.toString(new boolean[] {true, true, 
false}),
+                                Arrays.toString(new byte[] {1, 2, 3}),
+                                Arrays.toString(new short[] {1, 2, 3}),
+                                Arrays.toString(new int[] {1, 2, 3}),
+                                Arrays.toString(new long[] {1L, 2L, 3L}),
+                                Arrays.toString(new float[] {1.0F, 1.0F, 
1.0F}),
+                                Arrays.toString(new double[] {1.0, 1.0, 1.0}),
+                                Arrays.toString(new String[] {"1", "1"}),
+                                Arrays.toString(new String[] {"1", "1"}),
+                                Arrays.toString(new String[] {"1", "1"}),
+                                Arrays.toString(new String[] {"1", "1"}),
+                                Arrays.toString(
+                                        new BigDecimal[] {
                                             new BigDecimal("10.02"), new 
BigDecimal("10.03")
-                                        })
-                                        .toString(),
-                                (new String[] {"2020-06-09", 
"2020-06-10"}).toString(),
-                                (new String[] {"2020-06-09 12:02:02", 
"2020-06-10 12:02:02"})
-                                        .toString()
+                                        }),
+                                Arrays.toString(new String[] {"2020-06-09", 
"2020-06-10"}),
+                                Arrays.toString(
+                                        new String[] {"2020-06-09 12:02:02", 
"2020-06-10 12:02:02"})
                             }));
         }
         log.info("generate test data succeed");
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index f68c2892f9..0258c52661 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -114,22 +114,17 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
             case DECIMAL:
                 return Decimal.apply((BigDecimal) field);
             case ARRAY:
-                Class<?> elementTypeClass =
-                        ((ArrayType<?, ?>) 
dataType).getElementType().getTypeClass();
-
-                if (((ArrayType<?, ?>) dataType).getElementType() instanceof 
MapType) {
+                SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) 
dataType).getElementType();
+                if (elementType instanceof MapType) {
                     Object arrayMap =
                             Array.newInstance(ArrayBasedMapData.class, 
((Map[]) field).length);
                     for (int i = 0; i < ((Map[]) field).length; i++) {
                         Map<?, ?> value = (Map<?, ?>) ((Map[]) field)[i];
-                        MapType<?, ?> type =
-                                (MapType<?, ?>) ((ArrayType<?, ?>) 
dataType).getElementType();
-                        Array.set(arrayMap, i, convertMap(value, type));
+                        Array.set(arrayMap, i, convertMap(value, (MapType<?, 
?>) elementType));
                     }
                     return ArrayData.toArrayData(arrayMap);
                 }
-                // if string array, we need to covert every item in array from 
String to UTF8String
-                if (((ArrayType<?, ?>) 
dataType).getElementType().equals(BasicType.STRING_TYPE)) {
+                if (elementType.equals(BasicType.STRING_TYPE)) {
                     Object[] fields = (Object[]) field;
                     UTF8String[] objects =
                             Arrays.stream(fields)
@@ -137,13 +132,13 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
                                     .toArray(UTF8String[]::new);
                     return ArrayData.toArrayData(objects);
                 }
-                // except string, now only support convert boolean int tinyint 
smallint bigint float
-                // double, because SeaTunnel Array only support these types
-                Object array = Array.newInstance(elementTypeClass, ((Object[]) 
field).length);
-                for (int i = 0; i < ((Object[]) field).length; i++) {
-                    Array.set(array, i, ((Object[]) field)[i]);
+
+                Object[] arrayData = (Object[]) field;
+                Object[] convertedArray = new Object[arrayData.length];
+                for (int i = 0; i < arrayData.length; i++) {
+                    convertedArray[i] = convert(arrayData[i], elementType);
                 }
-                return ArrayData.toArrayData(field);
+                return ArrayData.toArrayData(convertedArray);
             default:
                 if (field instanceof Some) {
                     return ((Some<?>) field).get();

Reply via email to