This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7d31e5668c [Improve][Connector-v2] Improve the exception msg in 
case-sensitive case for paimon sink (#7549)
7d31e5668c is described below

commit 7d31e5668c60a87a577a9ab101e3b7a0976514e2
Author: dailai <[email protected]>
AuthorDate: Tue Sep 3 20:08:36 2024 +0800

    [Improve][Connector-v2] Improve the exception msg in case-sensitive case 
for paimon sink (#7549)
---
 .../paimon/exception/PaimonConnectorErrorCode.java |   3 +-
 .../seatunnel/paimon/utils/RowConverter.java       |   7 +-
 .../seatunnel/paimon/utils/SchemaUtil.java         |  15 ++-
 .../seatunnel/paimon/utils/RowConverterTest.java   | 111 ++++++++++++++-------
 4 files changed, 93 insertions(+), 43 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index ef37e52c01..ed4c80a40d 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -26,7 +26,8 @@ public enum PaimonConnectorErrorCode implements 
SeaTunnelErrorCode {
     TABLE_PRE_COMMIT_FAILED("PAIMON-03", "Paimon pre commit failed"),
     GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"),
     AUTHENTICATE_KERBEROS_FAILED("PAIMON-05", "Authenticate kerberos failed"),
-    LOAD_CATALOG("PAIMON-06", "Load catalog failed");
+    LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
+    GET_FILED_FAILED("PAIMON-07", "Get field failed");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 45c2c492c1..580f55b581 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -378,7 +378,7 @@ public class RowConverter {
                 binaryWriter.setNullAt(i);
                 continue;
             }
-            checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
+            checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields);
             String fieldName = seaTunnelRowType.getFieldName(i);
             switch (fieldTypes[i].getSqlType()) {
                 case TINYINT:
@@ -486,7 +486,7 @@ public class RowConverter {
         return binaryRow;
     }
 
-    private static void checkCanWriteWithType(
+    private static void checkCanWriteWithSchema(
             int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
         String sourceFieldName = seaTunnelRowType.getFieldName(i);
         SeaTunnelDataType<?> sourceFieldType = 
seaTunnelRowType.getFieldType(i);
@@ -495,7 +495,8 @@ public class RowConverter {
                 RowTypeConverter.reconvert(sourceFieldName, 
seaTunnelRowType.getFieldType(i));
         DataField exceptDataField = new DataField(i, sourceFieldName, 
exceptDataType);
         DataType sinkDataType = sinkDataField.type();
-        if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
+        if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())
+                || !StringUtils.equals(sourceFieldName, sinkDataField.name())) 
{
             throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
                     "Paimon",
                     sourceFieldName + StringUtils.SPACE + 
sourceFieldType.getSqlType(),
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index 0da047244f..fa8ed33820 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
@@ -30,6 +32,7 @@ import org.apache.paimon.types.DataType;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 /** The util seatunnel schema to paimon schema */
 public class SchemaUtil {
@@ -69,9 +72,13 @@ public class SchemaUtil {
     }
 
     public static DataField getDataField(List<DataField> fields, String 
fieldName) {
-        return fields.parallelStream()
-                .filter(field -> field.name().equals(fieldName))
-                .findFirst()
-                .get();
+        Optional<DataField> firstField =
+                fields.stream().filter(field -> 
field.name().equals(fieldName)).findFirst();
+        if (!firstField.isPresent()) {
+            throw new PaimonConnectorException(
+                    PaimonConnectorErrorCode.GET_FILED_FAILED,
+                    "Can not get the filed [" + fieldName + "] from source 
table");
+        }
+        return firstField.get();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index 8f7eea228f..58cb3e053b 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -42,6 +42,7 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.serializer.InternalArraySerializer;
 import org.apache.paimon.data.serializer.InternalMapSerializer;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -69,6 +70,25 @@ public class RowConverterTest {
 
     private SeaTunnelRowType seaTunnelRowType;
 
+    private volatile boolean isCaseSensitive = false;
+    private volatile int index = 0;
+    private static final String[] filedNames = {
+        "c_tinyint",
+        "c_smallint",
+        "c_int",
+        "c_bigint",
+        "c_float",
+        "c_double",
+        "c_decimal",
+        "c_string",
+        "c_bytes",
+        "c_boolean",
+        "c_date",
+        "c_timestamp",
+        "c_map",
+        "c_array"
+    };
+
     public static final List<String> KEY_NAME_LIST = 
Arrays.asList("c_tinyint");
 
     public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
@@ -118,41 +138,8 @@ public class RowConverterTest {
     }
 
     @BeforeEach
-    public void before() {
-        seaTunnelRowType =
-                new SeaTunnelRowType(
-                        new String[] {
-                            "c_tinyint",
-                            "c_smallint",
-                            "c_int",
-                            "c_bigint",
-                            "c_float",
-                            "c_double",
-                            "c_decimal",
-                            "c_string",
-                            "c_bytes",
-                            "c_boolean",
-                            "c_date",
-                            "c_timestamp",
-                            "c_map",
-                            "c_array"
-                        },
-                        new SeaTunnelDataType<?>[] {
-                            BasicType.BYTE_TYPE,
-                            BasicType.SHORT_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.LONG_TYPE,
-                            BasicType.FLOAT_TYPE,
-                            BasicType.DOUBLE_TYPE,
-                            new DecimalType(30, 8),
-                            BasicType.STRING_TYPE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            BasicType.BOOLEAN_TYPE,
-                            LocalTimeType.LOCAL_DATE_TYPE,
-                            LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                            new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
-                            ArrayType.STRING_ARRAY_TYPE
-                        });
+    public void generateTestData() {
+        initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index);
         byte tinyint = 1;
         short smallint = 2;
         int intNum = 3;
@@ -229,6 +216,32 @@ public class RowConverterTest {
         internalRow = binaryRow;
     }
 
+    private void initSeaTunnelRowTypeCaseSensitive(boolean isUpperCase, int 
index) {
+        String[] oneUpperCaseFiledNames = Arrays.copyOf(filedNames, 
filedNames.length);
+        if (isUpperCase) {
+            oneUpperCaseFiledNames[index] = 
oneUpperCaseFiledNames[index].toUpperCase();
+        }
+        seaTunnelRowType =
+                new SeaTunnelRowType(
+                        oneUpperCaseFiledNames,
+                        new SeaTunnelDataType<?>[] {
+                            BasicType.BYTE_TYPE,
+                            BasicType.SHORT_TYPE,
+                            BasicType.INT_TYPE,
+                            BasicType.LONG_TYPE,
+                            BasicType.FLOAT_TYPE,
+                            BasicType.DOUBLE_TYPE,
+                            new DecimalType(30, 8),
+                            BasicType.STRING_TYPE,
+                            PrimitiveByteArrayType.INSTANCE,
+                            BasicType.BOOLEAN_TYPE,
+                            LocalTimeType.LOCAL_DATE_TYPE,
+                            LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                            new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                            ArrayType.STRING_ARRAY_TYPE
+                        });
+    }
+
     @Test
     public void seaTunnelToPaimon() {
         SeaTunnelRuntimeException actualException =
@@ -248,6 +261,34 @@ public class RowConverterTest {
         InternalRow reconvert =
                 RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, 
getTableSchema(30, 8));
         Assertions.assertEquals(reconvert, internalRow);
+
+        isCaseSensitive = true;
+
+        for (int i = 0; i < filedNames.length; i++) {
+            index = i;
+            generateTestData();
+            String sourceFiledname = seaTunnelRowType.getFieldName(i);
+            DataType exceptDataType =
+                    RowTypeConverter.reconvert(sourceFiledname, 
seaTunnelRowType.getFieldType(i));
+            DataField exceptDataField = new DataField(i, sourceFiledname, 
exceptDataType);
+            TableSchema sinkTableSchema = getTableSchema(30, 8);
+            SeaTunnelRuntimeException actualException1 =
+                    Assertions.assertThrows(
+                            SeaTunnelRuntimeException.class,
+                            () ->
+                                    RowConverter.reconvert(
+                                            seaTunnelRow, seaTunnelRowType, 
sinkTableSchema));
+            Assertions.assertEquals(
+                    CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+                                    "Paimon",
+                                    sourceFiledname
+                                            + StringUtils.SPACE
+                                            + 
seaTunnelRowType.getFieldType(i).getSqlType(),
+                                    exceptDataField.asSQLString(),
+                                    
sinkTableSchema.fields().get(i).asSQLString())
+                            .getMessage(),
+                    actualException1.getMessage());
+        }
     }
 
     @Test

Reply via email to