This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bd3a2347 [FLINK-36565][transform] Route allows merging Decimals with 
various precisions
4bd3a2347 is described below

commit 4bd3a2347bf4a8235a0cc4cd3c3b74447c969e35
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Thu Nov 14 16:40:54 2024 +0800

    [FLINK-36565][transform] Route allows merging Decimals with various 
precisions
    
    This closes #3651
---
 .../apache/flink/cdc/common/utils/SchemaUtils.java |  38 +++--
 .../flink/cdc/common/utils/SchemaUtilsTest.java    |  17 +++
 .../flink/FlinkPipelineComposerITCase.java         | 161 +++++++++++++++++++++
 .../runtime/operators/schema/SchemaOperator.java   |  33 ++++-
 4 files changed, 234 insertions(+), 15 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index 111c32992..1b673b323 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -215,25 +215,21 @@ public class SchemaUtils {
                             lhsDecimal.getPrecision() - lhsDecimal.getScale(),
                             rhsDecimal.getPrecision() - rhsDecimal.getScale());
             int resultScale = Math.max(lhsDecimal.getScale(), 
rhsDecimal.getScale());
+            Preconditions.checkArgument(
+                    resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
+                    String.format(
+                            "Failed to merge %s and %s type into DECIMAL. %d 
precision digits required, %d available",
+                            lType,
+                            rType,
+                            resultIntDigits + resultScale,
+                            DecimalType.MAX_PRECISION));
             mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, 
resultScale);
         } else if (lType instanceof DecimalType && 
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
             // Merge decimal and int
-            DecimalType lhsDecimal = (DecimalType) lType;
-            mergedType =
-                    DataTypes.DECIMAL(
-                            Math.max(
-                                    lhsDecimal.getPrecision(),
-                                    lhsDecimal.getScale() + 
getNumericPrecision(rType)),
-                            lhsDecimal.getScale());
+            mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, 
rType);
         } else if (rType instanceof DecimalType && 
lType.is(DataTypeFamily.EXACT_NUMERIC)) {
             // Merge decimal and int
-            DecimalType rhsDecimal = (DecimalType) rType;
-            mergedType =
-                    DataTypes.DECIMAL(
-                            Math.max(
-                                    rhsDecimal.getPrecision(),
-                                    rhsDecimal.getScale() + 
getNumericPrecision(lType)),
-                            rhsDecimal.getScale());
+            mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, 
lType);
         } else {
             throw new IllegalStateException(
                     String.format("Incompatible types: \"%s\" and \"%s\"", 
lType, rType));
@@ -246,6 +242,20 @@ public class SchemaUtils {
         }
     }
 
+    private static DataType mergeExactNumericsIntoDecimal(
+            DecimalType decimalType, DataType otherType) {
+        int resultPrecision =
+                Math.max(
+                        decimalType.getPrecision(),
+                        decimalType.getScale() + 
getNumericPrecision(otherType));
+        Preconditions.checkArgument(
+                resultPrecision <= DecimalType.MAX_PRECISION,
+                String.format(
+                        "Failed to merge %s and %s type into DECIMAL. %d 
precision digits required, %d available",
+                        decimalType, otherType, resultPrecision, 
DecimalType.MAX_PRECISION));
+        return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
+    }
+
     @VisibleForTesting
     public static int getNumericPrecision(DataType dataType) {
         if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
index b33815017..6ff686aec 100644
--- 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
@@ -273,6 +273,23 @@ public class SchemaUtilsTest {
                                 DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 
2)))
                 .isEqualTo(DataTypes.DECIMAL(12, 4));
 
+        // Test overflow decimal conversions
+        Assertions.assertThatThrownBy(
+                        () ->
+                                SchemaUtils.inferWiderType(
+                                        DataTypes.DECIMAL(5, 5), 
DataTypes.DECIMAL(38, 0)))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Failed to merge DECIMAL(5, 5) NOT NULL and 
DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 
available");
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                SchemaUtils.inferWiderType(
+                                        DataTypes.DECIMAL(38, 0), 
DataTypes.DECIMAL(5, 5)))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Failed to merge DECIMAL(38, 0) NOT NULL and 
DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 
available");
+
         // Test merging with nullability
         Assertions.assertThat(
                         SchemaUtils.inferWiderType(
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 74afa8c4d..679ff9cc6 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.DecimalData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
@@ -61,6 +62,7 @@ import 
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -1216,6 +1218,88 @@ class FlinkPipelineComposerITCase {
         assertThat(outputEvents).containsExactlyInAnyOrder(expected);
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi 
sinkApi) throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        List<Event> events = generateDecimalColumnEvents("default_table_");
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.singletonList(
+                                new RouteDef(
+                                        
"default_namespace.default_schema.default_table_\\.*",
+                                        
"default_namespace.default_schema.default_everything_merged",
+                                        null,
+                                        "Merge all decimal columns with 
different precision")),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        String[] expected =
+                Stream.of(
+                                "CreateTableEvent{tableId={}, 
schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, 
primaryKeys=id, options=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[1, Alice, 17, 1], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=TINYINT}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[2, Alice, 17, 22], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 
5)}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
+                        .map(
+                                s ->
+                                        s.replace(
+                                                "{}",
+                                                
"default_namespace.default_schema.default_everything_merged"))
+                        .toArray(String[]::new);
+
+        assertThat(outputEvents).containsExactlyInAnyOrder(expected);
+    }
+
     private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
         List<Event> events = new ArrayList<>();
 
@@ -1286,6 +1370,83 @@ class FlinkPipelineComposerITCase {
         return events;
     }
 
+    private List<Event> generateDecimalColumnEvents(String tableNamePrefix) {
+        List<Event> events = new ArrayList<>();
+
+        // Initialize schemas
+        List<String> names =
+                Arrays.asList(
+                        "tiny",
+                        "small",
+                        "vanilla",
+                        "big",
+                        "dec_15_0",
+                        "decimal_10_10",
+                        "decimal_16_2",
+                        "decimal_29_19");
+
+        List<DataType> types =
+                Arrays.asList(
+                        DataTypes.TINYINT(),
+                        DataTypes.SMALLINT(),
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.DECIMAL(15, 0),
+                        DataTypes.DECIMAL(10, 5),
+                        DataTypes.DECIMAL(16, 2),
+                        DataTypes.DECIMAL(29, 19));
+
+        List<Object> values =
+                Arrays.asList(
+                        (byte) 1,
+                        (short) 22,
+                        3333,
+                        (long) 44444444,
+                        DecimalData.fromBigDecimal(new 
BigDecimal("555555555555555"), 15, 0),
+                        DecimalData.fromBigDecimal(new 
BigDecimal("66666.66666"), 10, 5),
+                        DecimalData.fromBigDecimal(new 
BigDecimal("77777777.17"), 16, 2),
+                        DecimalData.fromBigDecimal(
+                                new 
BigDecimal("888888888.8888888888888888888"), 29, 19));
+
+        List<Schema> schemas =
+                types.stream()
+                        .map(
+                                temporalColumnType ->
+                                        Schema.newBuilder()
+                                                .physicalColumn("id", 
DataTypes.INT())
+                                                .physicalColumn("name", 
DataTypes.STRING())
+                                                .physicalColumn("age", 
DataTypes.INT())
+                                                .physicalColumn("fav_num", 
temporalColumnType)
+                                                .primaryKey("id")
+                                                .build())
+                        .collect(Collectors.toList());
+
+        for (int i = 0; i < names.size(); i++) {
+            TableId generatedTableId =
+                    TableId.tableId(
+                            "default_namespace", "default_schema", 
tableNamePrefix + names.get(i));
+            Schema generatedSchema = schemas.get(i);
+            events.add(new CreateTableEvent(generatedTableId, 
generatedSchema));
+            events.add(
+                    DataChangeEvent.insertEvent(
+                            generatedTableId,
+                            generate(generatedSchema, 1 + i, "Alice", 17, 
values.get(i))));
+        }
+
+        for (int i = 0; i < names.size(); i++) {
+            TableId generatedTableId =
+                    TableId.tableId(
+                            "default_namespace", "default_schema", 
tableNamePrefix + names.get(i));
+            Schema generatedSchema = schemas.get(i);
+            events.add(
+                    DataChangeEvent.insertEvent(
+                            generatedTableId,
+                            generate(generatedSchema, 101 + i, "Zen", 19, 
values.get(i))));
+        }
+
+        return events;
+    }
+
     BinaryRecordData generate(Schema schema, Object... fields) {
         return (new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
                 .generate(
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index e04416db7..6ada032d5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.operators.schema;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.data.DecimalData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.StringData;
@@ -40,6 +41,7 @@ import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypeFamily;
 import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.cdc.common.types.DecimalType;
 import org.apache.flink.cdc.common.utils.ChangeEventUtils;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
@@ -74,6 +76,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -611,14 +614,42 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                 } else if (originalField instanceof Integer) {
                     // INT
                     return ((Integer) originalField).longValue();
+                } else if (originalField instanceof Long) {
+                    // BIGINT
+                    return originalField;
                 } else {
                     return fail(
                             new IllegalArgumentException(
                                     String.format(
                                             "Cannot fit type \"%s\" into a 
BIGINT column. "
-                                                    + "Currently only TINYINT 
/ SMALLINT / INT can be accepted by a BIGINT column",
+                                                    + "Currently only TINYINT 
/ SMALLINT / INT / LONG can be accepted by a BIGINT column",
+                                            originalField.getClass())));
+                }
+            } else if (destinationType instanceof DecimalType) {
+                DecimalType decimalType = (DecimalType) destinationType;
+                BigDecimal decimalValue;
+                if (originalField instanceof Byte) {
+                    decimalValue = BigDecimal.valueOf(((Byte) 
originalField).longValue(), 0);
+                } else if (originalField instanceof Short) {
+                    decimalValue = BigDecimal.valueOf(((Short) 
originalField).longValue(), 0);
+                } else if (originalField instanceof Integer) {
+                    decimalValue = BigDecimal.valueOf(((Integer) 
originalField).longValue(), 0);
+                } else if (originalField instanceof Long) {
+                    decimalValue = BigDecimal.valueOf((Long) originalField, 0);
+                } else if (originalField instanceof DecimalData) {
+                    decimalValue = ((DecimalData) 
originalField).toBigDecimal();
+                } else {
+                    return fail(
+                            new IllegalArgumentException(
+                                    String.format(
+                                            "Cannot fit type \"%s\" into a 
DECIMAL column. "
+                                                    + "Currently only BYTE / 
SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column",
                                             originalField.getClass())));
                 }
+                return decimalValue != null
+                        ? DecimalData.fromBigDecimal(
+                                decimalValue, decimalType.getPrecision(), 
decimalType.getScale())
+                        : null;
             } else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) 
{
                 if (originalField instanceof Float) {
                     // FLOAT

Reply via email to