This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7d31e5668c [Improve][Connector-v2] Improve the exception msg in
case-sensitive case for paimon sink (#7549)
7d31e5668c is described below
commit 7d31e5668c60a87a577a9ab101e3b7a0976514e2
Author: dailai <[email protected]>
AuthorDate: Tue Sep 3 20:08:36 2024 +0800
[Improve][Connector-v2] Improve the exception msg in case-sensitive case
for paimon sink (#7549)
---
.../paimon/exception/PaimonConnectorErrorCode.java | 3 +-
.../seatunnel/paimon/utils/RowConverter.java | 7 +-
.../seatunnel/paimon/utils/SchemaUtil.java | 15 ++-
.../seatunnel/paimon/utils/RowConverterTest.java | 111 ++++++++++++++-------
4 files changed, 93 insertions(+), 43 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index ef37e52c01..ed4c80a40d 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -26,7 +26,8 @@ public enum PaimonConnectorErrorCode implements
SeaTunnelErrorCode {
TABLE_PRE_COMMIT_FAILED("PAIMON-03", "Paimon pre commit failed"),
GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"),
AUTHENTICATE_KERBEROS_FAILED("PAIMON-05", "Authenticate kerberos failed"),
- LOAD_CATALOG("PAIMON-06", "Load catalog failed");
+ LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
+ GET_FILED_FAILED("PAIMON-07", "Get field failed");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 45c2c492c1..580f55b581 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -378,7 +378,7 @@ public class RowConverter {
binaryWriter.setNullAt(i);
continue;
}
- checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
+ checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
@@ -486,7 +486,7 @@ public class RowConverter {
return binaryRow;
}
- private static void checkCanWriteWithType(
+ private static void checkCanWriteWithSchema(
int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
String sourceFieldName = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType<?> sourceFieldType =
seaTunnelRowType.getFieldType(i);
@@ -495,7 +495,8 @@ public class RowConverter {
RowTypeConverter.reconvert(sourceFieldName,
seaTunnelRowType.getFieldType(i));
DataField exceptDataField = new DataField(i, sourceFieldName,
exceptDataType);
DataType sinkDataType = sinkDataField.type();
- if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
+ if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())
+ || !StringUtils.equals(sourceFieldName, sinkDataField.name()))
{
throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFieldName + StringUtils.SPACE +
sourceFieldType.getSqlType(),
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index 0da047244f..fa8ed33820 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
@@ -30,6 +32,7 @@ import org.apache.paimon.types.DataType;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
/** The util seatunnel schema to paimon schema */
public class SchemaUtil {
@@ -69,9 +72,13 @@ public class SchemaUtil {
}
public static DataField getDataField(List<DataField> fields, String
fieldName) {
- return fields.parallelStream()
- .filter(field -> field.name().equals(fieldName))
- .findFirst()
- .get();
+ Optional<DataField> firstField =
+ fields.stream().filter(field ->
field.name().equals(fieldName)).findFirst();
+ if (!firstField.isPresent()) {
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.GET_FILED_FAILED,
+ "Can not get the filed [" + fieldName + "] from source
table");
+ }
+ return firstField.get();
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index 8f7eea228f..58cb3e053b 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -42,6 +42,7 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -69,6 +70,25 @@ public class RowConverterTest {
private SeaTunnelRowType seaTunnelRowType;
+ private volatile boolean isCaseSensitive = false;
+ private volatile int index = 0;
+ private static final String[] filedNames = {
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_string",
+ "c_bytes",
+ "c_boolean",
+ "c_date",
+ "c_timestamp",
+ "c_map",
+ "c_array"
+ };
+
public static final List<String> KEY_NAME_LIST =
Arrays.asList("c_tinyint");
public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
@@ -118,41 +138,8 @@ public class RowConverterTest {
}
@BeforeEach
- public void before() {
- seaTunnelRowType =
- new SeaTunnelRowType(
- new String[] {
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_string",
- "c_bytes",
- "c_boolean",
- "c_date",
- "c_timestamp",
- "c_map",
- "c_array"
- },
- new SeaTunnelDataType<?>[] {
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(30, 8),
- BasicType.STRING_TYPE,
- PrimitiveByteArrayType.INSTANCE,
- BasicType.BOOLEAN_TYPE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
- ArrayType.STRING_ARRAY_TYPE
- });
+ public void generateTestData() {
+ initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index);
byte tinyint = 1;
short smallint = 2;
int intNum = 3;
@@ -229,6 +216,32 @@ public class RowConverterTest {
internalRow = binaryRow;
}
+ private void initSeaTunnelRowTypeCaseSensitive(boolean isUpperCase, int
index) {
+ String[] oneUpperCaseFiledNames = Arrays.copyOf(filedNames,
filedNames.length);
+ if (isUpperCase) {
+ oneUpperCaseFiledNames[index] =
oneUpperCaseFiledNames[index].toUpperCase();
+ }
+ seaTunnelRowType =
+ new SeaTunnelRowType(
+ oneUpperCaseFiledNames,
+ new SeaTunnelDataType<?>[] {
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(30, 8),
+ BasicType.STRING_TYPE,
+ PrimitiveByteArrayType.INSTANCE,
+ BasicType.BOOLEAN_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
+ ArrayType.STRING_ARRAY_TYPE
+ });
+ }
+
@Test
public void seaTunnelToPaimon() {
SeaTunnelRuntimeException actualException =
@@ -248,6 +261,34 @@ public class RowConverterTest {
InternalRow reconvert =
RowConverter.reconvert(seaTunnelRow, seaTunnelRowType,
getTableSchema(30, 8));
Assertions.assertEquals(reconvert, internalRow);
+
+ isCaseSensitive = true;
+
+ for (int i = 0; i < filedNames.length; i++) {
+ index = i;
+ generateTestData();
+ String sourceFiledname = seaTunnelRowType.getFieldName(i);
+ DataType exceptDataType =
+ RowTypeConverter.reconvert(sourceFiledname,
seaTunnelRowType.getFieldType(i));
+ DataField exceptDataField = new DataField(i, sourceFiledname,
exceptDataType);
+ TableSchema sinkTableSchema = getTableSchema(30, 8);
+ SeaTunnelRuntimeException actualException1 =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ RowConverter.reconvert(
+ seaTunnelRow, seaTunnelRowType,
sinkTableSchema));
+ Assertions.assertEquals(
+ CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+ "Paimon",
+ sourceFiledname
+ + StringUtils.SPACE
+ +
seaTunnelRowType.getFieldType(i).getSqlType(),
+ exceptDataField.asSQLString(),
+
sinkTableSchema.fields().get(i).asSQLString())
+ .getMessage(),
+ actualException1.getMessage());
+ }
}
@Test