This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.2 by this push:
     new 5c1c50e2e [FLINK-36474][route] Support merging timestamp columns when 
routing
5c1c50e2e is described below

commit 5c1c50e2ed8e3e04e538d200b8de116f81786421
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Mon Nov 11 19:17:51 2024 +0800

    [FLINK-36474][route] Support merging timestamp columns when routing
    
    This closes #3693
---
 .../apache/flink/cdc/common/utils/SchemaUtils.java |  27 ++-
 .../flink/cdc/common/utils/SchemaUtilsTest.java    |  29 +++
 .../cdc/composer/flink/FlinkPipelineComposer.java  |  22 ++-
 .../flink/translator/SchemaOperatorTranslator.java |  38 ++--
 .../flink/FlinkPipelineComposerITCase.java         | 205 +++++++++++++++++++++
 .../doris/sink/DorisMetadataApplierITCase.java     |   3 +-
 .../sink/StarRocksMetadataApplierITCase.java       |   3 +-
 .../cdc/pipeline/tests/SchemaEvolveE2eITCase.java  |   2 +-
 .../tests/SchemaEvolvingTransformE2eITCase.java    |   2 +-
 .../runtime/operators/schema/SchemaOperator.java   |  71 ++++++-
 .../operators/schema/SchemaOperatorFactory.java    |   5 +-
 .../schema/coordinator/SchemaDerivation.java       |  38 ++--
 .../schema/coordinator/SchemaDerivationTest.java   |   2 +-
 13 files changed, 379 insertions(+), 68 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index 928a3b9db..ba9dd4212 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -32,6 +32,9 @@ import org.apache.flink.cdc.common.types.DataTypeFamily;
 import org.apache.flink.cdc.common.types.DataTypeRoot;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
 
 import javax.annotation.Nullable;
 
@@ -175,6 +178,24 @@ public class SchemaUtils {
         if (lType.equals(rType)) {
             // identical type
             mergedType = rType;
+        } else if (lType instanceof TimestampType && rType instanceof 
TimestampType) {
+            return DataTypes.TIMESTAMP(
+                    Math.max(
+                            ((TimestampType) lType).getPrecision(),
+                            ((TimestampType) rType).getPrecision()));
+        } else if (lType instanceof ZonedTimestampType && rType instanceof 
ZonedTimestampType) {
+            return DataTypes.TIMESTAMP_TZ(
+                    Math.max(
+                            ((ZonedTimestampType) lType).getPrecision(),
+                            ((ZonedTimestampType) rType).getPrecision()));
+        } else if (lType instanceof LocalZonedTimestampType
+                && rType instanceof LocalZonedTimestampType) {
+            return DataTypes.TIMESTAMP_LTZ(
+                    Math.max(
+                            ((LocalZonedTimestampType) lType).getPrecision(),
+                            ((LocalZonedTimestampType) rType).getPrecision()));
+        } else if (lType.is(DataTypeFamily.TIMESTAMP) && 
rType.is(DataTypeFamily.TIMESTAMP)) {
+            return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
         } else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
                 && rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
             mergedType = DataTypes.BIGINT();
@@ -184,7 +205,7 @@ public class SchemaUtils {
         } else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
                 && rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
             mergedType = DataTypes.DOUBLE();
-        } else if (lType.is(DataTypeRoot.DECIMAL) && 
rType.is(DataTypeRoot.DECIMAL)) {
+        } else if (lType instanceof DecimalType && rType instanceof 
DecimalType) {
             // Merge two decimal types
             DecimalType lhsDecimal = (DecimalType) lType;
             DecimalType rhsDecimal = (DecimalType) rType;
@@ -194,7 +215,7 @@ public class SchemaUtils {
                             rhsDecimal.getPrecision() - rhsDecimal.getScale());
             int resultScale = Math.max(lhsDecimal.getScale(), 
rhsDecimal.getScale());
             mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, 
resultScale);
-        } else if (lType.is(DataTypeRoot.DECIMAL) && 
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
+        } else if (lType instanceof DecimalType && 
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
             // Merge decimal and int
             DecimalType lhsDecimal = (DecimalType) lType;
             mergedType =
@@ -203,7 +224,7 @@ public class SchemaUtils {
                                     lhsDecimal.getPrecision(),
                                     lhsDecimal.getScale() + 
getNumericPrecision(rType)),
                             lhsDecimal.getScale());
-        } else if (rType.is(DataTypeRoot.DECIMAL) && 
lType.is(DataTypeFamily.EXACT_NUMERIC)) {
+        } else if (rType instanceof DecimalType && 
lType.is(DataTypeFamily.EXACT_NUMERIC)) {
             // Merge decimal and int
             DecimalType rhsDecimal = (DecimalType) rType;
             mergedType =
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
index 8a508a890..a3aff294b 100644
--- 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
@@ -291,6 +291,35 @@ public class SchemaUtilsTest {
                                 DataTypes.INT().nullable(), 
DataTypes.INT().nullable()))
                 .isEqualTo(DataTypes.INT().nullable());
 
+        // Test merging temporal types
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(9), 
DataTypes.TIMESTAMP(6)))
+                .isEqualTo(DataTypes.TIMESTAMP(9));
+
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(
+                                DataTypes.TIMESTAMP_TZ(3), 
DataTypes.TIMESTAMP_TZ(7)))
+                .isEqualTo(DataTypes.TIMESTAMP_TZ(7));
+
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(
+                                DataTypes.TIMESTAMP_LTZ(2), 
DataTypes.TIMESTAMP_LTZ(1)))
+                .isEqualTo(DataTypes.TIMESTAMP_LTZ(2));
+
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(
+                                DataTypes.TIMESTAMP_LTZ(), 
DataTypes.TIMESTAMP()))
+                .isEqualTo(DataTypes.TIMESTAMP(9));
+
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(DataTypes.TIMESTAMP_TZ(), 
DataTypes.TIMESTAMP()))
+                .isEqualTo(DataTypes.TIMESTAMP(9));
+
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(
+                                DataTypes.TIMESTAMP_LTZ(), 
DataTypes.TIMESTAMP_TZ()))
+                .isEqualTo(DataTypes.TIMESTAMP(9));
+
         // incompatible type merges test
         Assertions.assertThatThrownBy(
                         () -> SchemaUtils.inferWiderType(DataTypes.INT(), 
DataTypes.DOUBLE()))
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index b3f17b2e5..114035fe9 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -88,17 +89,19 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
 
     @Override
     public PipelineExecution compose(PipelineDef pipelineDef) {
-        int parallelism = 
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
+        Configuration pipelineDefConfig = pipelineDef.getConfig();
+
+        int parallelism = 
pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
         env.getConfig().setParallelism(parallelism);
 
         SchemaChangeBehavior schemaChangeBehavior =
-                
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
+                
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
 
         // Build Source Operator
         DataSourceTranslator sourceTranslator = new DataSourceTranslator();
         DataStream<Event> stream =
                 sourceTranslator.translate(
-                        pipelineDef.getSource(), env, pipelineDef.getConfig(), 
parallelism);
+                        pipelineDef.getSource(), env, pipelineDefConfig, 
parallelism);
 
         // Build PreTransformOperator for processing Schema Event
         TransformTranslator transformTranslator = new TransformTranslator();
@@ -110,10 +113,9 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
         SchemaOperatorTranslator schemaOperatorTranslator =
                 new SchemaOperatorTranslator(
                         schemaChangeBehavior,
-                        
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
-                        pipelineDef
-                                .getConfig()
-                                
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
+                        
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
+                        
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
+                        
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
         OperatorIDGenerator schemaOperatorIDGenerator =
                 new 
OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
 
@@ -122,13 +124,13 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                 transformTranslator.translatePostTransform(
                         stream,
                         pipelineDef.getTransforms(),
-                        
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
+                        
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
                         pipelineDef.getUdfs());
 
         // Build DataSink in advance as schema operator requires 
MetadataApplier
         DataSinkTranslator sinkTranslator = new DataSinkTranslator();
         DataSink dataSink =
-                sinkTranslator.createDataSink(pipelineDef.getSink(), 
pipelineDef.getConfig(), env);
+                sinkTranslator.createDataSink(pipelineDef.getSink(), 
pipelineDefConfig, env);
 
         stream =
                 schemaOperatorTranslator.translate(
@@ -157,7 +159,7 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
         addFrameworkJars();
 
         return new FlinkPipelineExecution(
-                env, 
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
+                env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), 
isBlocking);
     }
 
     private void addFrameworkJars() {
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
index c965c88a6..c5cadcd1e 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
@@ -19,7 +19,6 @@ package org.apache.flink.cdc.composer.flink.translator;
 
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -39,16 +38,18 @@ import java.util.List;
 public class SchemaOperatorTranslator {
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final String schemaOperatorUid;
-
     private final Duration rpcTimeOut;
+    private final String timezone;
 
     public SchemaOperatorTranslator(
             SchemaChangeBehavior schemaChangeBehavior,
             String schemaOperatorUid,
-            Duration rpcTimeOut) {
+            Duration rpcTimeOut,
+            String timezone) {
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.schemaOperatorUid = schemaOperatorUid;
         this.rpcTimeOut = rpcTimeOut;
+        this.timezone = timezone;
     }
 
     public DataStream<Event> translate(
@@ -56,7 +57,8 @@ public class SchemaOperatorTranslator {
             int parallelism,
             MetadataApplier metadataApplier,
             List<RouteDef> routes) {
-        return addSchemaOperator(input, parallelism, metadataApplier, routes, 
schemaChangeBehavior);
+        return addSchemaOperator(
+                input, parallelism, metadataApplier, routes, 
schemaChangeBehavior, timezone);
     }
 
     public String getSchemaOperatorUid() {
@@ -68,7 +70,8 @@ public class SchemaOperatorTranslator {
             int parallelism,
             MetadataApplier metadataApplier,
             List<RouteDef> routes,
-            SchemaChangeBehavior schemaChangeBehavior) {
+            SchemaChangeBehavior schemaChangeBehavior,
+            String timezone) {
         List<RouteRule> routingRules = new ArrayList<>();
         for (RouteDef route : routes) {
             routingRules.add(
@@ -82,27 +85,12 @@ public class SchemaOperatorTranslator {
                         "SchemaOperator",
                         new EventTypeInfo(),
                         new SchemaOperatorFactory(
-                                metadataApplier, routingRules, rpcTimeOut, 
schemaChangeBehavior));
+                                metadataApplier,
+                                routingRules,
+                                rpcTimeOut,
+                                schemaChangeBehavior,
+                                timezone));
         stream.uid(schemaOperatorUid).setParallelism(parallelism);
         return stream;
     }
-
-    private DataStream<Event> dropSchemaChangeEvent(DataStream<Event> input, 
int parallelism) {
-        return input.filter(event -> !(event instanceof SchemaChangeEvent))
-                .setParallelism(parallelism);
-    }
-
-    private DataStream<Event> exceptionOnSchemaChange(DataStream<Event> input, 
int parallelism) {
-        return input.map(
-                        event -> {
-                            if (event instanceof SchemaChangeEvent) {
-                                throw new RuntimeException(
-                                        String.format(
-                                                "Aborting execution as the 
pipeline encountered a schema change event: %s",
-                                                event));
-                            }
-                            return event;
-                        })
-                .setParallelism(parallelism);
-    }
 }
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 4e16d2380..0c25c17ca 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -18,6 +18,10 @@
 package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -57,10 +61,16 @@ import 
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
 import static 
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
@@ -1092,4 +1102,199 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 
1], after=[], op=DELETE, meta=()}",
                         
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 
2], after=[2, x], op=UPDATE, meta=()}");
     }
+
+    @ParameterizedTest
+    @EnumSource
+    void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi 
sinkApi)
+            throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        List<Event> events = generateTemporalColumnEvents("default_table_");
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, 
"America/New_York");
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Arrays.asList(
+                                new RouteDef(
+                                        
"default_namespace.default_schema.default_table_ts_\\.*",
+                                        
"default_namespace.default_schema.default_table_timestamp_merged",
+                                        null,
+                                        "Merge timestamp columns with 
different precision"),
+                                new RouteDef(
+                                        
"default_namespace.default_schema.default_table_tz_\\.*",
+                                        
"default_namespace.default_schema.default_table_zoned_timestamp_merged",
+                                        null,
+                                        "Merge timestamp_tz columns with 
different precision"),
+                                new RouteDef(
+                                        
"default_namespace.default_schema.default_table_ltz_\\.*",
+                                        
"default_namespace.default_schema.default_table_local_zoned_timestamp_merged",
+                                        null,
+                                        "Merge timestamp_ltz columns with 
different precision"),
+                                new RouteDef(
+                                        
"default_namespace.default_schema.default_table_\\.*",
+                                        
"default_namespace.default_schema.default_everything_merged",
+                                        null,
+                                        "Merge all timestamp family columns 
with different precision")),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        String[] expected =
+                Stream.of(
+                                // Merging timestamp with different precision
+                                
"CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` 
INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, 
options=()}",
+                                
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 
17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
+                                
"AlterColumnTypeEvent{tableId={}_table_timestamp_merged, 
nameMapping={birthday=TIMESTAMP(9)}}",
+                                
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 
17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
+                                
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 
19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
+                                
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 
19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
+
+                                // Merging zoned timestamp with different 
precision
+                                
"CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` 
INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, 
primaryKeys=id, options=()}",
+                                
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, 
Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
+                                
"AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, 
nameMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}}",
+                                
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, 
Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
+                                
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], 
after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
+                                
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], 
after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
+
+                                // Merging local-zoned timestamp with 
different precision
+                                
"CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, 
schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, 
primaryKeys=id, options=()}",
+                                
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], 
after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
+                                
"AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, 
nameMapping={birthday=TIMESTAMP_LTZ(9)}}",
+                                
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], 
after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
+                                
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], 
after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
+                                
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], 
after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
+
+                                // Merging all
+                                
"CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` 
STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
+                                
"AlterColumnTypeEvent{tableId={}_everything_merged, 
nameMapping={birthday=TIMESTAMP(9)}}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, 
meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[3, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[4, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, 
meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[5, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[6, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, 
meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, 
meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[103, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[104, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, 
meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[105, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}_everything_merged, 
before=[], after=[106, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, 
meta=()}")
+                        .map(s -> s.replace("{}", 
"default_namespace.default_schema.default"))
+                        .toArray(String[]::new);
+
+        assertThat(outputEvents).containsExactlyInAnyOrder(expected);
+    }
+
+    private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
+        List<Event> events = new ArrayList<>();
+
+        // Initialize schemas
+        List<String> names = Arrays.asList("ts_0", "ts_9", "tz_0", "tz_9", 
"ltz_0", "ltz_9");
+
+        List<DataType> types =
+                Arrays.asList(
+                        DataTypes.TIMESTAMP(0),
+                        DataTypes.TIMESTAMP(9),
+                        DataTypes.TIMESTAMP_TZ(0),
+                        DataTypes.TIMESTAMP_TZ(9),
+                        DataTypes.TIMESTAMP_LTZ(0),
+                        DataTypes.TIMESTAMP_LTZ(9));
+
+        Instant lowPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57Z");
+        Instant highPrecisionTimestamp = 
Instant.parse("2020-01-01T14:28:57.123456789Z");
+
+        List<Object> values =
+                Arrays.asList(
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.ofInstant(lowPrecisionTimestamp, 
ZoneId.of("UTC"))),
+                        TimestampData.fromLocalDateTime(
+                                
LocalDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))),
+                        ZonedTimestampData.fromZonedDateTime(
+                                ZonedDateTime.ofInstant(lowPrecisionTimestamp, 
ZoneId.of("UTC"))),
+                        ZonedTimestampData.fromZonedDateTime(
+                                
ZonedDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))),
+                        
LocalZonedTimestampData.fromInstant(lowPrecisionTimestamp),
+                        
LocalZonedTimestampData.fromInstant(highPrecisionTimestamp));
+
+        List<Schema> schemas =
+                types.stream()
+                        .map(
+                                temporalColumnType ->
+                                        Schema.newBuilder()
+                                                .physicalColumn("id", 
DataTypes.INT())
+                                                .physicalColumn("name", 
DataTypes.STRING())
+                                                .physicalColumn("age", 
DataTypes.INT())
+                                                .physicalColumn("birthday", 
temporalColumnType)
+                                                .primaryKey("id")
+                                                .build())
+                        .collect(Collectors.toList());
+
+        for (int i = 0; i < names.size(); i++) {
+            TableId generatedTableId =
+                    TableId.tableId(
+                            "default_namespace", "default_schema", 
tableNamePrefix + names.get(i));
+            Schema generatedSchema = schemas.get(i);
+            events.add(new CreateTableEvent(generatedTableId, 
generatedSchema));
+            events.add(
+                    DataChangeEvent.insertEvent(
+                            generatedTableId,
+                            generate(generatedSchema, 1 + i, "Alice", 17, 
values.get(i))));
+        }
+
+        for (int i = 0; i < names.size(); i++) {
+            TableId generatedTableId =
+                    TableId.tableId(
+                            "default_namespace", "default_schema", 
tableNamePrefix + names.get(i));
+            Schema generatedSchema = schemas.get(i);
+            events.add(
+                    DataChangeEvent.insertEvent(
+                            generatedTableId,
+                            generate(generatedSchema, 101 + i, "Zen", 19, 
values.get(i))));
+        }
+
+        return events;
+    }
+
+    BinaryRecordData generate(Schema schema, Object... fields) {
+        return (new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
+                .generate(
+                        Arrays.stream(fields)
+                                .map(
+                                        e ->
+                                                (e instanceof String)
+                                                        ? 
BinaryStringData.fromString((String) e)
+                                                        : e)
+                                .toArray());
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
index 438cc3781..84eeebbca 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
@@ -417,7 +417,8 @@ public class DorisMetadataApplierITCase extends 
DorisSinkTestBase {
                 new SchemaOperatorTranslator(
                         SchemaChangeBehavior.EVOLVE,
                         "$$_schema_operator_$$",
-                        DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+                        DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT,
+                        "UTC");
 
         OperatorIDGenerator schemaOperatorIDGenerator =
                 new 
OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index c5e833e54..8b433ae4f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -366,7 +366,8 @@ public class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
                 new SchemaOperatorTranslator(
                         SchemaChangeBehavior.EVOLVE,
                         "$$_schema_operator_$$",
-                        DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+                        DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT,
+                        "UTC");
 
         OperatorIDGenerator schemaOperatorIDGenerator =
                 new 
OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 462708085..59814f198 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -112,7 +112,7 @@ public class SchemaEvolveE2eITCase extends 
PipelineTestEnvironment {
                 false,
                 Collections.emptyList(),
                 Arrays.asList(
-                        "java.lang.IllegalStateException: Incompatible types: 
\"INT\" and \"DOUBLE\"",
+                        "java.lang.IllegalStateException: Incompatible types 
found for column `age`: \"INT\" and \"DOUBLE\"",
                         "org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy"));
     }
 
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
index edcac1bbe..eaeb96312 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
@@ -113,7 +113,7 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                 false,
                 Collections.emptyList(),
                 Arrays.asList(
-                        "java.lang.IllegalStateException: Incompatible types: 
\"INT\" and \"DOUBLE\"",
+                        "java.lang.IllegalStateException: Incompatible types 
found for column `age`: \"INT\" and \"DOUBLE\"",
                         "org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy"));
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index a1bdd7885..50d4e3192 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -20,8 +20,11 @@ package org.apache.flink.cdc.runtime.operators.schema;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.FlushEvent;
@@ -71,6 +74,8 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -96,6 +101,8 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
 
     private final List<RouteRule> routingRules;
 
+    private final String timezone;
+
     /**
      * Storing route source table selector, sink table name (before symbol 
replacement), and replace
      * symbol in a tuple.
@@ -126,6 +133,7 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         this.chainingStrategy = ChainingStrategy.ALWAYS;
         this.rpcTimeOutInMillis = 
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
         this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
+        this.timezone = "UTC";
     }
 
     @VisibleForTesting
@@ -134,8 +142,10 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         this.chainingStrategy = ChainingStrategy.ALWAYS;
         this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
         this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
+        this.timezone = "UTC";
     }
 
+    @VisibleForTesting
     public SchemaOperator(
             List<RouteRule> routingRules,
             Duration rpcTimeOut,
@@ -144,6 +154,19 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         this.chainingStrategy = ChainingStrategy.ALWAYS;
         this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
         this.schemaChangeBehavior = schemaChangeBehavior;
+        this.timezone = "UTC";
+    }
+
+    public SchemaOperator(
+            List<RouteRule> routingRules,
+            Duration rpcTimeOut,
+            SchemaChangeBehavior schemaChangeBehavior,
+            String timezone) {
+        this.routingRules = routingRules;
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
+        this.schemaChangeBehavior = schemaChangeBehavior;
+        this.timezone = timezone;
     }
 
     @Override
@@ -365,7 +388,11 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                 } else {
                     fieldGetters.add(
                             new TypeCoercionFieldGetter(
-                                    column.getType(), fieldGetter, 
tolerantMode));
+                                    
originalSchema.getColumn(columnName).get().getType(),
+                                    column.getType(),
+                                    fieldGetter,
+                                    tolerantMode,
+                                    timezone));
                 }
             }
         }
@@ -534,17 +561,23 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
     }
 
     private static class TypeCoercionFieldGetter implements 
RecordData.FieldGetter {
+        private final DataType originalType;
         private final DataType destinationType;
         private final RecordData.FieldGetter originalFieldGetter;
         private final boolean tolerantMode;
+        private final String timezone;
 
         public TypeCoercionFieldGetter(
+                DataType originalType,
                 DataType destinationType,
                 RecordData.FieldGetter originalFieldGetter,
-                boolean tolerantMode) {
+                boolean tolerantMode,
+                String timezone) {
+            this.originalType = originalType;
             this.destinationType = destinationType;
             this.originalFieldGetter = originalFieldGetter;
             this.tolerantMode = tolerantMode;
+            this.timezone = timezone;
         }
 
         private Object fail(IllegalArgumentException e) throws 
IllegalArgumentException {
@@ -602,6 +635,21 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                                                     + "Currently only CHAR / 
VARCHAR can be accepted by a STRING column",
                                             originalField.getClass())));
                 }
+            } else if 
(destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+                    && 
originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
+                // For now, TimestampData / ZonedTimestampData / 
LocalZonedTimestampData has no
+                // difference in its internal representation, so there's no 
need to do any precision
+                // conversion.
+                return originalField;
+            } else if 
(destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
+                    && originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) 
{
+                return originalField;
+            } else if 
(destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                    && 
originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
+                return originalField;
+            } else if (destinationType.is(DataTypeFamily.TIMESTAMP)
+                    && originalType.is(DataTypeFamily.TIMESTAMP)) {
+                return castToTimestamp(originalField, timezone);
             } else {
                 return fail(
                         new IllegalArgumentException(
@@ -617,4 +665,23 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         // Needless to do anything, since AbstractStreamOperator#snapshotState 
and #processElement
         // is guaranteed not to be mixed together.
     }
+
+    private static TimestampData castToTimestamp(Object object, String 
timezone) {
+        if (object == null) {
+            return null;
+        }
+        if (object instanceof LocalZonedTimestampData) {
+            return TimestampData.fromLocalDateTime(
+                    LocalDateTime.ofInstant(
+                            ((LocalZonedTimestampData) object).toInstant(), 
ZoneId.of(timezone)));
+        } else if (object instanceof ZonedTimestampData) {
+            return TimestampData.fromLocalDateTime(
+                    LocalDateTime.ofInstant(
+                            ((ZonedTimestampData) object).toInstant(), 
ZoneId.of(timezone)));
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Unable to implicitly coerce object `%s` as a 
TIMESTAMP.", object));
+        }
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
index 7cd35a20d..367f65597 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
@@ -47,8 +47,9 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
             Duration rpcTimeOut,
-            SchemaChangeBehavior schemaChangeBehavior) {
-        super(new SchemaOperator(routingRules, rpcTimeOut, 
schemaChangeBehavior));
+            SchemaChangeBehavior schemaChangeBehavior,
+            String timezone) {
+        super(new SchemaOperator(routingRules, rpcTimeOut, 
schemaChangeBehavior, timezone));
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
         this.schemaChangeBehavior = schemaChangeBehavior;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
index e4b547b21..1bb197b45 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
@@ -31,9 +31,8 @@ import org.apache.flink.cdc.common.schema.PhysicalColumn;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.types.DataType;
-import org.apache.flink.cdc.common.types.DataTypeFamily;
-import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.utils.ChangeEventUtils;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -247,7 +246,9 @@ public class SchemaDerivation {
                                 // Check type compatibility
                                 DataType widerType =
                                         getWiderType(
-                                                
existedColumnInDerivedTable.getType(), dataType);
+                                                columnName,
+                                                
existedColumnInDerivedTable.getType(),
+                                                dataType);
                                 if 
(!widerType.equals(existedColumnInDerivedTable.getType())) {
                                     typeDifference.put(
                                             
existedColumnInDerivedTable.getName(), widerType);
@@ -282,6 +283,7 @@ public class SchemaDerivation {
                         .equals(addedColumn.getAddColumn().getType())) {
                     DataType widerType =
                             getWiderType(
+                                    existedColumnInDerivedTable.getName(),
                                     existedColumnInDerivedTable.getType(),
                                     addedColumn.getAddColumn().getType());
                     if 
(!widerType.equals(existedColumnInDerivedTable.getType())) {
@@ -318,7 +320,10 @@ public class SchemaDerivation {
                 Column existedColumnInDerivedTable = 
optionalColumnInDerivedTable.get();
                 if 
(!existedColumnInDerivedTable.getType().equals(column.getType())) {
                     DataType widerType =
-                            
getWiderType(existedColumnInDerivedTable.getType(), column.getType());
+                            getWiderType(
+                                    existedColumnInDerivedTable.getName(),
+                                    existedColumnInDerivedTable.getType(),
+                                    column.getType());
                     if 
(!widerType.equals(existedColumnInDerivedTable.getType())) {
                         
newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType);
                     }
@@ -336,23 +341,14 @@ public class SchemaDerivation {
         return schemaChangeEvents;
     }
 
-    private DataType getWiderType(DataType thisType, DataType thatType) {
-        if (thisType.equals(thatType)) {
-            return thisType;
-        }
-        if (thisType.is(DataTypeFamily.INTEGER_NUMERIC)
-                && thatType.is(DataTypeFamily.INTEGER_NUMERIC)) {
-            return DataTypes.BIGINT();
-        }
-        if (thisType.is(DataTypeFamily.CHARACTER_STRING)
-                && thatType.is(DataTypeFamily.CHARACTER_STRING)) {
-            return DataTypes.STRING();
-        }
-        if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
-                && thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
-            return DataTypes.DOUBLE();
+    private DataType getWiderType(String columnName, DataType thisType, 
DataType thatType) {
+        try {
+            return SchemaUtils.inferWiderType(thisType, thatType);
+        } catch (IllegalStateException e) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Incompatible types found for column `%s`: \"%s\" 
and \"%s\"",
+                            columnName, thisType, thatType));
         }
-        throw new IllegalStateException(
-                String.format("Incompatible types: \"%s\" and \"%s\"", 
thisType, thatType));
     }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
index 9a2d1cfb4..05d74ac7d 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
@@ -381,7 +381,7 @@ class SchemaDerivationTest {
                                 schemaDerivation.applySchemaChange(
                                         new CreateTableEvent(TABLE_2, 
INCOMPATIBLE_SCHEMA)))
                 .isInstanceOf(IllegalStateException.class)
-                .hasMessage("Incompatible types: \"INT\" and \"STRING\"");
+                .hasMessage("Incompatible types found for column `age`: 
\"INT\" and \"STRING\"");
     }
 
     @Test

Reply via email to