This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e381c0869d [hotfix] Add ALLOW_NON_STRING_TO_STRING to cdc schema
change (#5298)
e381c0869d is described below
commit e381c0869de2b60af000f994ac4da2f801156bb3
Author: yuzelin <[email protected]>
AuthorDate: Mon Mar 17 13:43:05 2025 +0800
[hotfix] Add ALLOW_NON_STRING_TO_STRING to cdc schema change (#5298)
---
.../flink/action/cdc/CdcActionCommonUtils.java | 3 +-
.../paimon/flink/action/cdc/TypeMapping.java | 3 +-
.../flink/action/cdc/schema/JdbcSchemaUtils.java | 3 +-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 13 +++++----
.../cdc/kafka/KafkaSyncTableActionITCase.java | 33 +++++++++-------------
.../cdc/mysql/MySqlSyncTableActionITCase.java | 8 +++++-
.../UpdatedDataFieldsProcessFunctionBaseTest.java | 29 +++++++++++++------
.../schema/schemaevolution/debezium-data-5.txt | 2 +-
8 files changed, 57 insertions(+), 37 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index be63b5a43f..46127c5f78 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -92,7 +92,8 @@ public class CdcActionCommonUtils {
return false;
}
DataType type = paimonSchema.fields().get(idx).type();
- if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
+ if (UpdatedDataFieldsProcessFunction.canConvert(
+ field.type(), type, TypeMapping.defaultMapping())
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT)
{
LOG.info(
"Cannot convert field '{}' from source table type '{}'
to Paimon type '{}'.",
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
index 4630d9763b..499c3823b9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
@@ -77,7 +77,8 @@ public class TypeMapping implements Serializable {
CHAR_TO_STRING,
LONGTEXT_TO_BYTES,
DECIMAL_NO_CHANGE,
- BIGINT_UNSIGNED_TO_BIGINT;
+ BIGINT_UNSIGNED_TO_BIGINT,
+ ALLOW_NON_STRING_TO_STRING;
private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS
=
Arrays.stream(TypeMappingMode.values())
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
index ad0c60f94e..9999152b8b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
@@ -122,7 +122,8 @@ public class JdbcSchemaUtils {
DataField dataField = currentFields.get(newField.name());
if (Objects.nonNull(dataField)) {
DataType oldType = dataField.type();
- switch (UpdatedDataFieldsProcessFunction.canConvert(oldType,
newField.type())) {
+ switch (UpdatedDataFieldsProcessFunction.canConvert(
+ oldType, newField.type(),
TypeMapping.defaultMapping())) {
case CONVERT:
currentFields.put(newField.name(), newField);
break;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 83ff100d8d..e544848036 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -133,7 +133,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
- switch (canConvert(oldType, newType)) {
+ switch (canConvert(oldType, newType, typeMapping)) {
case CONVERT:
catalog.alterTable(identifier, schemaChange, false);
break;
@@ -157,7 +157,8 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
}
- public static ConvertAction canConvert(DataType oldType, DataType newType)
{
+ public static ConvertAction canConvert(
+ DataType oldType, DataType newType, TypeMapping typeMapping) {
if (oldType.equalsIgnoreNullable(newType)) {
return ConvertAction.CONVERT;
}
@@ -171,7 +172,9 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
// object can always be converted to string
- if (oldIdx < 0 && newIdx >= 0) {
+ if ((oldIdx < 0 && newIdx >= 0)
+ && typeMapping.containsMode(
+
TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING)) {
return ConvertAction.CONVERT;
}
@@ -277,8 +280,8 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
/**
- * Return type of {@link
UpdatedDataFieldsProcessFunction#canConvert(DataType, DataType)}. This
- * enum indicates the action to perform.
+ * Return type of {@link UpdatedDataFieldsProcessFunction#canConvert}.
This enum indicates the
+ * action to perform.
*/
public enum ConvertAction {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index a5835b8833..f5b6bb5923 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
@@ -233,28 +234,22 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
"+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]");
waitForResult(expected, table, rowType, primaryKeys);
- // column type covert (int64 -> string)
+ // column type covert exception (int64 -> string)
writeRecordsToKafka(
topic, "kafka/%s/table/schema/%s/%s-data-5.txt", format,
sourceDir, format);
- rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
- DataTypes.STRING(),
- DataTypes.STRING(),
- DataTypes.DOUBLE(),
- DataTypes.STRING()
- },
- new String[] {"id", "name", "description", "weight",
"age"});
- expected =
- Arrays.asList(
- "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
- "+I[103, 12-pack drill bits, 12-pack of drill bits
with sizes ranging from #40 to #3, 0.8, 18]",
- "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]",
- "+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]",
- "+I[106, hammer, 12oz carpenter's hammer, 0.75, 24]");
- waitForResult(expected, table, rowType, primaryKeys);
+ while (true) {
+ JobStatus status = jobClient.getJobStatus().get();
+ if (status != JobStatus.RUNNING) {
+ assertThatThrownBy(() ->
jobClient.getJobExecutionResult().get())
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot convert field age from type
BIGINT to STRING of Paimon table"));
+ break;
+ }
+ Thread.sleep(1000);
+ }
}
public void testNotSupportFormat(String format) throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 1398d87718..aa7d0199bc 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -258,7 +259,12 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "schema_evolution_multiple");
- MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig).build();
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withTypeMappingModes(
+
TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING
+ .configString())
+ .build();
runActionWithDefaultEnv(action);
checkTableSchema(
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
index 08d9ac9758..a042c79ad5 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
@@ -39,11 +40,13 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
null;
convertAction =
- UpdatedDataFieldsProcessFunctionBase.canConvert(oldVarchar,
biggerLengthVarchar);
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldVarchar, biggerLengthVarchar,
TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
convertAction =
- UpdatedDataFieldsProcessFunctionBase.canConvert(oldVarchar,
smallerLengthVarchar);
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldVarchar, smallerLengthVarchar,
TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE,
convertAction);
@@ -56,10 +59,14 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
SmallIntType smallintType = new SmallIntType();
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
null;
- convertAction =
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, bigintType);
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, bigintType, TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
- convertAction =
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallintType);
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, smallintType, TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE,
convertAction);
@@ -72,10 +79,14 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
DecimalType smallerRangeType = new DecimalType(10, 3);
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
null;
- convertAction =
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, biggerRangeType);
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, biggerRangeType,
TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
- convertAction =
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallerRangeType);
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, smallerRangeType,
TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE,
convertAction);
@@ -89,11 +100,13 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
null;
convertAction =
- UpdatedDataFieldsProcessFunctionBase.canConvert(oldType,
biggerLengthTimestamp);
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, biggerLengthTimestamp,
TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
convertAction =
- UpdatedDataFieldsProcessFunctionBase.canConvert(oldType,
smallerLengthTimestamp);
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, smallerLengthTimestamp,
TypeMapping.defaultMapping());
Assert.assertEquals(
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE,
convertAction);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
index 4ef80257ee..fde64c550f 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
@@ -16,4 +16,4 @@
* limitations under the License.
*/
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"},{"type":"string","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","
[...]
\ No newline at end of file
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"},{"type":"string","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","
[...]
\ No newline at end of file