This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e381c0869d [hotfix] Add ALLOW_NON_STRING_TO_STRING to cdc schema 
change (#5298)
e381c0869d is described below

commit e381c0869de2b60af000f994ac4da2f801156bb3
Author: yuzelin <[email protected]>
AuthorDate: Mon Mar 17 13:43:05 2025 +0800

    [hotfix] Add ALLOW_NON_STRING_TO_STRING to cdc schema change (#5298)
---
 .../flink/action/cdc/CdcActionCommonUtils.java     |  3 +-
 .../paimon/flink/action/cdc/TypeMapping.java       |  3 +-
 .../flink/action/cdc/schema/JdbcSchemaUtils.java   |  3 +-
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  | 13 +++++----
 .../cdc/kafka/KafkaSyncTableActionITCase.java      | 33 +++++++++-------------
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  8 +++++-
 .../UpdatedDataFieldsProcessFunctionBaseTest.java  | 29 +++++++++++++------
 .../schema/schemaevolution/debezium-data-5.txt     |  2 +-
 8 files changed, 57 insertions(+), 37 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index be63b5a43f..46127c5f78 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -92,7 +92,8 @@ public class CdcActionCommonUtils {
                 return false;
             }
             DataType type = paimonSchema.fields().get(idx).type();
-            if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
+            if (UpdatedDataFieldsProcessFunction.canConvert(
+                            field.type(), type, TypeMapping.defaultMapping())
                     != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) 
{
                 LOG.info(
                         "Cannot convert field '{}' from source table type '{}' 
to Paimon type '{}'.",
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
index 4630d9763b..499c3823b9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java
@@ -77,7 +77,8 @@ public class TypeMapping implements Serializable {
         CHAR_TO_STRING,
         LONGTEXT_TO_BYTES,
         DECIMAL_NO_CHANGE,
-        BIGINT_UNSIGNED_TO_BIGINT;
+        BIGINT_UNSIGNED_TO_BIGINT,
+        ALLOW_NON_STRING_TO_STRING;
 
         private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS 
=
                 Arrays.stream(TypeMappingMode.values())
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
index ad0c60f94e..9999152b8b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java
@@ -122,7 +122,8 @@ public class JdbcSchemaUtils {
             DataField dataField = currentFields.get(newField.name());
             if (Objects.nonNull(dataField)) {
                 DataType oldType = dataField.type();
-                switch (UpdatedDataFieldsProcessFunction.canConvert(oldType, 
newField.type())) {
+                switch (UpdatedDataFieldsProcessFunction.canConvert(
+                        oldType, newField.type(), 
TypeMapping.defaultMapping())) {
                     case CONVERT:
                         currentFields.put(newField.name(), newField);
                         break;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 83ff100d8d..e544848036 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -133,7 +133,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                             + " does not exist in table. This is unexpected.");
             DataType oldType = schema.fields().get(idx).type();
             DataType newType = updateColumnType.newDataType();
-            switch (canConvert(oldType, newType)) {
+            switch (canConvert(oldType, newType, typeMapping)) {
                 case CONVERT:
                     catalog.alterTable(identifier, schemaChange, false);
                     break;
@@ -157,7 +157,8 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         }
     }
 
-    public static ConvertAction canConvert(DataType oldType, DataType newType) 
{
+    public static ConvertAction canConvert(
+            DataType oldType, DataType newType, TypeMapping typeMapping) {
         if (oldType.equalsIgnoreNullable(newType)) {
             return ConvertAction.CONVERT;
         }
@@ -171,7 +172,9 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         }
 
         // object can always be converted to string
-        if (oldIdx < 0 && newIdx >= 0) {
+        if ((oldIdx < 0 && newIdx >= 0)
+                && typeMapping.containsMode(
+                        
TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING)) {
             return ConvertAction.CONVERT;
         }
 
@@ -277,8 +280,8 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
     }
 
     /**
-     * Return type of {@link 
UpdatedDataFieldsProcessFunction#canConvert(DataType, DataType)}. This
-     * enum indicates the action to perform.
+     * Return type of {@link UpdatedDataFieldsProcessFunction#canConvert}. 
This enum indicates the
+     * action to perform.
      */
     public enum ConvertAction {
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index a5835b8833..f5b6bb5923 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 
@@ -233,28 +234,22 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         "+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        // column type covert (int64 -> string)
+        // column type covert exception (int64 -> string)
         writeRecordsToKafka(
                 topic, "kafka/%s/table/schema/%s/%s-data-5.txt", format, 
sourceDir, format);
 
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING(),
-                            DataTypes.DOUBLE(),
-                            DataTypes.STRING()
-                        },
-                        new String[] {"id", "name", "description", "weight", 
"age"});
-        expected =
-                Arrays.asList(
-                        "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
-                        "+I[103, 12-pack drill bits, 12-pack of drill bits 
with sizes ranging from #40 to #3, 0.8, 18]",
-                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]",
-                        "+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]",
-                        "+I[106, hammer, 12oz carpenter's hammer, 0.75, 24]");
-        waitForResult(expected, table, rowType, primaryKeys);
+        while (true) {
+            JobStatus status = jobClient.getJobStatus().get();
+            if (status != JobStatus.RUNNING) {
+                assertThatThrownBy(() -> 
jobClient.getJobExecutionResult().get())
+                        .satisfies(
+                                anyCauseMatches(
+                                        UnsupportedOperationException.class,
+                                        "Cannot convert field age from type 
BIGINT to STRING of Paimon table"));
+                break;
+            }
+            Thread.sleep(1000);
+        }
     }
 
     public void testNotSupportFormat(String format) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 1398d87718..aa7d0199bc 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -258,7 +259,12 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "schema_evolution_multiple");
 
-        MySqlSyncTableAction action = 
syncTableActionBuilder(mySqlConfig).build();
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withTypeMappingModes(
+                                
TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING
+                                        .configString())
+                        .build();
         runActionWithDefaultEnv(action);
 
         checkTableSchema(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
index 08d9ac9758..a042c79ad5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.IntType;
@@ -39,11 +40,13 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
 
         UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = 
null;
         convertAction =
-                UpdatedDataFieldsProcessFunctionBase.canConvert(oldVarchar, 
biggerLengthVarchar);
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldVarchar, biggerLengthVarchar, 
TypeMapping.defaultMapping());
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
         convertAction =
-                UpdatedDataFieldsProcessFunctionBase.canConvert(oldVarchar, 
smallerLengthVarchar);
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldVarchar, smallerLengthVarchar, 
TypeMapping.defaultMapping());
 
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, 
convertAction);
@@ -56,10 +59,14 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
         SmallIntType smallintType = new SmallIntType();
 
         UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = 
null;
-        convertAction = 
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, bigintType);
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, bigintType, TypeMapping.defaultMapping());
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
-        convertAction = 
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallintType);
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, smallintType, TypeMapping.defaultMapping());
 
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, 
convertAction);
@@ -72,10 +79,14 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
         DecimalType smallerRangeType = new DecimalType(10, 3);
 
         UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = 
null;
-        convertAction = 
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, biggerRangeType);
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, biggerRangeType, 
TypeMapping.defaultMapping());
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
-        convertAction = 
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallerRangeType);
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, smallerRangeType, 
TypeMapping.defaultMapping());
 
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, 
convertAction);
@@ -89,11 +100,13 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
 
         UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = 
null;
         convertAction =
-                UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, 
biggerLengthTimestamp);
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, biggerLengthTimestamp, 
TypeMapping.defaultMapping());
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
         convertAction =
-                UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, 
smallerLengthTimestamp);
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, smallerLengthTimestamp, 
TypeMapping.defaultMapping());
 
         Assert.assertEquals(
                 UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, 
convertAction);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
index 4ef80257ee..fde64c550f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt
@@ -16,4 +16,4 @@
  * limitations under the License.
  */
 
-{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"},{"type":"string","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","
 [...]
\ No newline at end of file
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"},{"type":"string","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","
 [...]
\ No newline at end of file

Reply via email to