This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new df210ea73d [Hotfix][Connector-V2] Fixed lost data precision for
decimal data types (#7527)
df210ea73d is described below
commit df210ea73d5b14b36b9637fc4d12bee09fa8decd
Author: dailai <[email protected]>
AuthorDate: Fri Aug 30 12:44:00 2024 +0800
[Hotfix][Connector-V2] Fixed lost data precision for decimal data types
(#7527)
---
.../seatunnel/paimon/utils/RowConverter.java | 56 ++++++----
.../seatunnel/paimon/utils/RowConverterTest.java | 118 ++++++++++++---------
2 files changed, 106 insertions(+), 68 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 9c576018a3..45c2c492c1 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -346,17 +346,18 @@ public class RowConverter {
*
* @param seaTunnelRow SeaTunnel row object
* @param seaTunnelRowType SeaTunnel row type
- * @param tableSchema Paimon table schema
+ * @param sinkTableSchema Paimon table schema
* @return Paimon row object
*/
public static InternalRow reconvert(
- SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType,
TableSchema tableSchema) {
- List<DataField> sinkTotalFields = tableSchema.fields();
+ SeaTunnelRow seaTunnelRow,
+ SeaTunnelRowType seaTunnelRowType,
+ TableSchema sinkTableSchema) {
+ List<DataField> sinkTotalFields = sinkTableSchema.fields();
int sourceTotalFields = seaTunnelRowType.getTotalFields();
if (sourceTotalFields != sinkTotalFields.size()) {
- throw new CommonError()
- .writeRowErrorWithFiledsCountNotMatch(
- "Paimon", sourceTotalFields,
sinkTotalFields.size());
+ throw CommonError.writeRowErrorWithFiledsCountNotMatch(
+ "Paimon", sourceTotalFields, sinkTotalFields.size());
}
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
@@ -399,14 +400,17 @@ public class RowConverter {
binaryWriter.writeDouble(i, (Double)
seaTunnelRow.getField(i));
break;
case DECIMAL:
- DecimalType fieldType = (DecimalType)
seaTunnelRowType.getFieldType(i);
+ DataField decimalDataField =
+ SchemaUtil.getDataField(sinkTotalFields,
fieldName);
+ org.apache.paimon.types.DecimalType decimalType =
+ (org.apache.paimon.types.DecimalType)
decimalDataField.type();
binaryWriter.writeDecimal(
i,
Decimal.fromBigDecimal(
(BigDecimal) seaTunnelRow.getField(i),
- fieldType.getPrecision(),
- fieldType.getScale()),
- fieldType.getPrecision());
+ decimalType.getPrecision(),
+ decimalType.getScale()),
+ decimalType.getPrecision());
break;
case STRING:
binaryWriter.writeString(
@@ -464,9 +468,12 @@ public class RowConverter {
SeaTunnelDataType<?> rowType =
seaTunnelRowType.getFieldType(i);
Object row = seaTunnelRow.getField(i);
InternalRow paimonRow =
- reconvert((SeaTunnelRow) row, (SeaTunnelRowType)
rowType, tableSchema);
+ reconvert(
+ (SeaTunnelRow) row,
+ (SeaTunnelRowType) rowType,
+ sinkTableSchema);
RowType paimonRowType =
- RowTypeConverter.reconvert((SeaTunnelRowType)
rowType, tableSchema);
+ RowTypeConverter.reconvert((SeaTunnelRowType)
rowType, sinkTableSchema);
binaryWriter.writeRow(i, paimonRow, new
InternalRowSerializer(paimonRowType));
break;
default:
@@ -489,12 +496,25 @@ public class RowConverter {
DataField exceptDataField = new DataField(i, sourceFieldName,
exceptDataType);
DataType sinkDataType = sinkDataField.type();
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
- throw new CommonError()
- .writeRowErrorWithSchemaIncompatibleSchema(
- "Paimon",
- sourceFieldName + StringUtils.SPACE +
sourceFieldType.getSqlType(),
- exceptDataField.asSQLString(),
- sinkDataField.asSQLString());
+ throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+ "Paimon",
+ sourceFieldName + StringUtils.SPACE +
sourceFieldType.getSqlType(),
+ exceptDataField.asSQLString(),
+ sinkDataField.asSQLString());
+ }
+ if (sourceFieldType instanceof DecimalType
+ && sinkDataType instanceof
org.apache.paimon.types.DecimalType) {
+ DecimalType sourceDecimalType = (DecimalType) sourceFieldType;
+ org.apache.paimon.types.DecimalType sinkDecimalType =
+ (org.apache.paimon.types.DecimalType) sinkDataType;
+ if (sinkDecimalType.getPrecision() <
sourceDecimalType.getPrecision()
+ || sinkDecimalType.getScale() <
sourceDecimalType.getScale()) {
+ throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+ "Paimon",
+ sourceFieldName + StringUtils.SPACE +
sourceFieldType.getSqlType(),
+ exceptDataField.asSQLString(),
+ sinkDataField.asSQLString());
+ }
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index ebde744d03..8f7eea228f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -26,7 +26,10 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryArrayWriter;
import org.apache.paimon.data.BinaryMap;
@@ -66,45 +69,54 @@ public class RowConverterTest {
private SeaTunnelRowType seaTunnelRowType;
- private TableSchema tableSchema;
-
- public static final RowType DEFAULT_ROW_TYPE =
- RowType.of(
- new DataType[] {
- DataTypes.TINYINT(),
- DataTypes.SMALLINT(),
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.FLOAT(),
- DataTypes.DOUBLE(),
- DataTypes.DECIMAL(10, 10),
- DataTypes.STRING(),
- DataTypes.BYTES(),
- DataTypes.BOOLEAN(),
- DataTypes.DATE(),
- DataTypes.TIMESTAMP(),
- DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
- DataTypes.ARRAY(DataTypes.STRING())
- },
- new String[] {
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_string",
- "c_bytes",
- "c_boolean",
- "c_date",
- "c_timestamp",
- "c_map",
- "c_array"
- });
-
public static final List<String> KEY_NAME_LIST =
Arrays.asList("c_tinyint");
+ public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(decimalPrecision, decimalScale),
+ DataTypes.STRING(),
+ DataTypes.BYTES(),
+ DataTypes.BOOLEAN(),
+ DataTypes.DATE(),
+ DataTypes.TIMESTAMP(),
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()),
+ DataTypes.ARRAY(DataTypes.STRING())
+ },
+ new String[] {
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_string",
+ "c_bytes",
+ "c_boolean",
+ "c_date",
+ "c_timestamp",
+ "c_map",
+ "c_array"
+ });
+
+ return new TableSchema(
+ 0,
+ TableSchema.newFields(rowType),
+ rowType.getFieldCount(),
+ Collections.EMPTY_LIST,
+ KEY_NAME_LIST,
+ Collections.EMPTY_MAP,
+ "");
+ }
+
@BeforeEach
public void before() {
seaTunnelRowType =
@@ -215,27 +227,33 @@ public class RowConverterTest {
binaryRowWriter.writeArray(
13, binaryArray2, new
InternalArraySerializer(DataTypes.STRING()));
internalRow = binaryRow;
-
- tableSchema =
- new TableSchema(
- 0,
- TableSchema.newFields(DEFAULT_ROW_TYPE),
- DEFAULT_ROW_TYPE.getFieldCount(),
- Collections.EMPTY_LIST,
- KEY_NAME_LIST,
- Collections.EMPTY_MAP,
- "");
}
@Test
public void seaTunnelToPaimon() {
- InternalRow convert = RowConverter.reconvert(seaTunnelRow,
seaTunnelRowType, tableSchema);
- Assertions.assertEquals(convert, internalRow);
+ SeaTunnelRuntimeException actualException =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ RowConverter.reconvert(
+ seaTunnelRow, seaTunnelRowType,
getTableSchema(10, 10)));
+ SeaTunnelRuntimeException exceptedException =
+ CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+ "Paimon",
+ "c_decimal" + StringUtils.SPACE + "DECIMAL",
+ "`c_decimal` DECIMAL(30, 8)",
+ "`c_decimal` DECIMAL(10, 10)");
+ Assertions.assertEquals(exceptedException.getMessage(),
actualException.getMessage());
+
+ InternalRow reconvert =
+ RowConverter.reconvert(seaTunnelRow, seaTunnelRowType,
getTableSchema(30, 8));
+ Assertions.assertEquals(reconvert, internalRow);
}
@Test
public void paimonToSeaTunnel() {
- SeaTunnelRow convert = RowConverter.convert(internalRow,
seaTunnelRowType, tableSchema);
+ SeaTunnelRow convert =
+ RowConverter.convert(internalRow, seaTunnelRowType,
getTableSchema(10, 10));
Assertions.assertEquals(convert, seaTunnelRow);
}
}