This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4f07c6ff77 [Fix][Connector-v2][Doris] Array type data parsing failed
(#10095)
4f07c6ff77 is described below
commit 4f07c6ff77013375b756084511f7cafd108f2c7a
Author: Jast <[email protected]>
AuthorDate: Thu Nov 27 19:14:32 2025 +0800
[Fix][Connector-v2][Doris] Array type data parsing failed (#10095)
---
.../arrow/reader/ArrowToSeatunnelRowReader.java | 38 ++++++++++++++++++----
.../arrow/ArrowToSeatunnelRowReaderTest.java | 8 +++--
.../seatunnel/e2e/connector/doris/DorisIT.java | 34 +++++++++----------
.../spark/serialization/InternalRowConverter.java | 25 ++++++--------
4 files changed, 64 insertions(+), 41 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
index 5f5b5fec17..f7821ed5fd 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
@@ -262,8 +263,8 @@ public class ArrowToSeatunnelRowReader implements
AutoCloseable {
private Object convertMap(
int rowIndex, Converter converter, FieldVector fieldVector,
MapType mapType) {
- SqlType keyType = mapType.getKeyType().getSqlType();
- SqlType valueType = mapType.getValueType().getSqlType();
+ SeaTunnelDataType keyType = mapType.getKeyType();
+ SeaTunnelDataType valueType = mapType.getValueType();
Map<String, Function> fieldConverters = new HashMap<>();
fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType));
fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType));
@@ -272,10 +273,20 @@ public class ArrowToSeatunnelRowReader implements
AutoCloseable {
private Object convertArray(
int rowIndex, Converter converter, FieldVector fieldVector,
ArrayType arrayType) {
- SqlType elementType = arrayType.getElementType().getSqlType();
+ SeaTunnelDataType elementType = arrayType.getElementType();
Map<String, Function> fieldConverters = new HashMap<>();
fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType));
- return converter.convert(rowIndex, fieldVector, fieldConverters);
+ Object convertedValue = converter.convert(rowIndex, fieldVector,
fieldConverters);
+ if (convertedValue instanceof List) {
+ List<?> list = (List<?>) convertedValue;
+ Class<?> componentType = arrayType.getElementType().getTypeClass();
+ Object array = Array.newInstance(componentType, list.size());
+ for (int i = 0; i < list.size(); i++) {
+ Array.set(array, i, list.get(i));
+ }
+ return array;
+ }
+ return convertedValue;
}
private Object convertRow(
@@ -284,13 +295,26 @@ public class ArrowToSeatunnelRowReader implements
AutoCloseable {
List<SeaTunnelDataType<?>> fieldTypes = rowType.getChildren();
Map<String, Function> fieldConverters = new HashMap<>();
for (int i = 0; i < fieldTypes.size(); i++) {
- fieldConverters.put(fieldNames[i],
genericsConvert(fieldTypes.get(i).getSqlType()));
+ fieldConverters.put(fieldNames[i],
genericsConvert(fieldTypes.get(i)));
}
return converter.convert(rowIndex, fieldVector, fieldConverters);
}
- private Function<Object, Object> genericsConvert(SqlType sqlType) {
- return value -> convertSeatunnelRowValue(sqlType, null, value);
+ private Function<Object, Object> genericsConvert(SeaTunnelDataType
dataType) {
+ return value -> {
+ if (dataType instanceof ArrayType) {
+ if (value instanceof List) {
+ List<?> list = (List<?>) value;
+ Class<?> componentType = ((ArrayType)
dataType).getElementType().getTypeClass();
+ Object array = Array.newInstance(componentType,
list.size());
+ for (int i = 0; i < list.size(); i++) {
+ Array.set(array, i, list.get(i));
+ }
+ return array;
+ }
+ }
+ return convertSeatunnelRowValue(dataType.getSqlType(), null,
value);
+ };
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
index 79a26743df..24e923c9ca 100644
---
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
+++
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java
@@ -393,11 +393,15 @@ public class ArrowToSeatunnelRowReaderTest {
Collections.singletonList(localDateTime.toLocalDate()),
actualDate2Data);
// check array int
List<Object> actualArrayIntData =
- rows.stream().map(s ->
s.getField(16)).collect(Collectors.toList());
+ rows.stream()
+ .map(s -> Arrays.asList((Integer[])
s.getField(16)))
+ .collect(Collectors.toList());
Assertions.assertIterableEquals(arrayData1, actualArrayIntData);
// check array timestamp
List<Object> actualArrayTimestampData =
- rows.stream().map(s ->
s.getField(17)).collect(Collectors.toList());
+ rows.stream()
+ .map(s -> Arrays.asList((LocalDateTime[])
s.getField(17)))
+ .collect(Collectors.toList());
Assertions.assertIterableEquals(arrayData2,
actualArrayTimestampData);
// check SECOND timestamp without timezone
List<Object> actualTimestampSecData =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index d817c7743b..f62454a861 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -717,24 +717,24 @@ public class DorisIT extends AbstractDorisIT {
GenerateTestData.genDateString(),
GenerateTestData.genJsonString(),
GenerateTestData.genJsonString(),
- (new boolean[] {true, true, false}).toString(),
- (new int[] {1, 2, 3}).toString(),
- (new int[] {1, 2, 3}).toString(),
- (new int[] {1, 2, 3}).toString(),
- (new long[] {1L, 2L, 3L}).toString(),
- (new float[] {1.0F, 1.0F, 1.0F}).toString(),
- (new double[] {1.0, 1.0, 1.0}).toString(),
- (new String[] {"1", "1"}).toString(),
- (new String[] {"1", "1"}).toString(),
- (new String[] {"1", "1"}).toString(),
- (new String[] {"1", "1"}).toString(),
- (new BigDecimal[] {
+ Arrays.toString(new boolean[] {true, true,
false}),
+ Arrays.toString(new byte[] {1, 2, 3}),
+ Arrays.toString(new short[] {1, 2, 3}),
+ Arrays.toString(new int[] {1, 2, 3}),
+ Arrays.toString(new long[] {1L, 2L, 3L}),
+ Arrays.toString(new float[] {1.0F, 1.0F,
1.0F}),
+ Arrays.toString(new double[] {1.0, 1.0, 1.0}),
+ Arrays.toString(new String[] {"1", "1"}),
+ Arrays.toString(new String[] {"1", "1"}),
+ Arrays.toString(new String[] {"1", "1"}),
+ Arrays.toString(new String[] {"1", "1"}),
+ Arrays.toString(
+ new BigDecimal[] {
new BigDecimal("10.02"), new
BigDecimal("10.03")
- })
- .toString(),
- (new String[] {"2020-06-09",
"2020-06-10"}).toString(),
- (new String[] {"2020-06-09 12:02:02",
"2020-06-10 12:02:02"})
- .toString()
+ }),
+ Arrays.toString(new String[] {"2020-06-09",
"2020-06-10"}),
+ Arrays.toString(
+ new String[] {"2020-06-09 12:02:02",
"2020-06-10 12:02:02"})
}));
}
log.info("generate test data succeed");
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index f68c2892f9..0258c52661 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -114,22 +114,17 @@ public final class InternalRowConverter extends
RowConverter<InternalRow> {
case DECIMAL:
return Decimal.apply((BigDecimal) field);
case ARRAY:
- Class<?> elementTypeClass =
- ((ArrayType<?, ?>)
dataType).getElementType().getTypeClass();
-
- if (((ArrayType<?, ?>) dataType).getElementType() instanceof
MapType) {
+ SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>)
dataType).getElementType();
+ if (elementType instanceof MapType) {
Object arrayMap =
Array.newInstance(ArrayBasedMapData.class,
((Map[]) field).length);
for (int i = 0; i < ((Map[]) field).length; i++) {
Map<?, ?> value = (Map<?, ?>) ((Map[]) field)[i];
- MapType<?, ?> type =
- (MapType<?, ?>) ((ArrayType<?, ?>)
dataType).getElementType();
- Array.set(arrayMap, i, convertMap(value, type));
+ Array.set(arrayMap, i, convertMap(value, (MapType<?,
?>) elementType));
}
return ArrayData.toArrayData(arrayMap);
}
- // if string array, we need to covert every item in array from
String to UTF8String
- if (((ArrayType<?, ?>)
dataType).getElementType().equals(BasicType.STRING_TYPE)) {
+ if (elementType.equals(BasicType.STRING_TYPE)) {
Object[] fields = (Object[]) field;
UTF8String[] objects =
Arrays.stream(fields)
@@ -137,13 +132,13 @@ public final class InternalRowConverter extends
RowConverter<InternalRow> {
.toArray(UTF8String[]::new);
return ArrayData.toArrayData(objects);
}
- // except string, now only support convert boolean int tinyint
smallint bigint float
- // double, because SeaTunnel Array only support these types
- Object array = Array.newInstance(elementTypeClass, ((Object[])
field).length);
- for (int i = 0; i < ((Object[]) field).length; i++) {
- Array.set(array, i, ((Object[]) field)[i]);
+
+ Object[] arrayData = (Object[]) field;
+ Object[] convertedArray = new Object[arrayData.length];
+ for (int i = 0; i < arrayData.length; i++) {
+ convertedArray[i] = convert(arrayData[i], elementType);
}
- return ArrayData.toArrayData(field);
+ return ArrayData.toArrayData(convertedArray);
default:
if (field instanceof Some) {
return ((Some<?>) field).get();