This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 4bd3a2347 [FLINK-36565][transform] Route allows merging Decimals with various precisions 4bd3a2347 is described below commit 4bd3a2347bf4a8235a0cc4cd3c3b74447c969e35 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Thu Nov 14 16:40:54 2024 +0800 [FLINK-36565][transform] Route allows merging Decimals with various precisions This closes #3651 --- .../apache/flink/cdc/common/utils/SchemaUtils.java | 38 +++-- .../flink/cdc/common/utils/SchemaUtilsTest.java | 17 +++ .../flink/FlinkPipelineComposerITCase.java | 161 +++++++++++++++++++++ .../runtime/operators/schema/SchemaOperator.java | 33 ++++- 4 files changed, 234 insertions(+), 15 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 111c32992..1b673b323 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -215,25 +215,21 @@ public class SchemaUtils { lhsDecimal.getPrecision() - lhsDecimal.getScale(), rhsDecimal.getPrecision() - rhsDecimal.getScale()); int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale()); + Preconditions.checkArgument( + resultIntDigits + resultScale <= DecimalType.MAX_PRECISION, + String.format( + "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", + lType, + rType, + resultIntDigits + resultScale, + DecimalType.MAX_PRECISION)); mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale); } else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int - DecimalType lhsDecimal = (DecimalType) lType; - mergedType = - DataTypes.DECIMAL( - Math.max( - lhsDecimal.getPrecision(), - lhsDecimal.getScale() + getNumericPrecision(rType)), - lhsDecimal.getScale()); + mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType); } else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int - DecimalType rhsDecimal = (DecimalType) rType; - mergedType = - DataTypes.DECIMAL( - Math.max( - rhsDecimal.getPrecision(), - rhsDecimal.getScale() + getNumericPrecision(lType)), - rhsDecimal.getScale()); + mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, lType); } else { throw new IllegalStateException( String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType)); @@ -246,6 +242,20 @@ public class SchemaUtils { } } + private static DataType mergeExactNumericsIntoDecimal( + DecimalType decimalType, DataType otherType) { + int resultPrecision = + Math.max( + decimalType.getPrecision(), + decimalType.getScale() + getNumericPrecision(otherType)); + Preconditions.checkArgument( + resultPrecision <= DecimalType.MAX_PRECISION, + String.format( + "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", + decimalType, otherType, resultPrecision, DecimalType.MAX_PRECISION)); + return DataTypes.DECIMAL(resultPrecision, decimalType.getScale()); + } + @VisibleForTesting public static int getNumericPrecision(DataType dataType) { if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index b33815017..6ff686aec 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -273,6 +273,23 @@ public class SchemaUtilsTest { DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2))) .isEqualTo(DataTypes.DECIMAL(12, 4)); + // Test overflow decimal conversions + Assertions.assertThatThrownBy( + () -> + SchemaUtils.inferWiderType( + DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0))) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Failed to merge DECIMAL(5, 5) NOT NULL and DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 available"); + + Assertions.assertThatThrownBy( + () -> + SchemaUtils.inferWiderType( + DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5))) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Failed to merge DECIMAL(38, 0) NOT NULL and DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 available"); + // Test merging with nullability Assertions.assertThat( SchemaUtils.inferWiderType( diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 74afa8c4d..679ff9cc6 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.ZonedTimestampData; @@ -61,6 +62,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -1216,6 +1218,88 @@ class FlinkPipelineComposerITCase { assertThat(outputEvents).containsExactlyInAnyOrder(expected); } + @ParameterizedTest + @EnumSource + void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + List<Event> events = generateDecimalColumnEvents("default_table_"); + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.default_table_\\.*", + "default_namespace.default_schema.default_everything_merged", + null, + "Merge all decimal columns with different precision")), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + String[] expected = + Stream.of( + "CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=TINYINT}}", + "DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[3, Alice, 17, 3333], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[4, Alice, 17, 44444444], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}}", + "DataChangeEvent{tableId={}, before=[], after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}}", + "DataChangeEvent{tableId={}, before=[], after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}}", + "DataChangeEvent{tableId={}, before=[], after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}") + .map( + s -> + s.replace( + "{}", + "default_namespace.default_schema.default_everything_merged")) + .toArray(String[]::new); + + assertThat(outputEvents).containsExactlyInAnyOrder(expected); + } + private List<Event> generateTemporalColumnEvents(String tableNamePrefix) { List<Event> events = new ArrayList<>(); @@ -1286,6 +1370,83 @@ class FlinkPipelineComposerITCase { return events; } + private List<Event> generateDecimalColumnEvents(String tableNamePrefix) { + List<Event> events = new ArrayList<>(); + + // Initialize schemas + List<String> names = + Arrays.asList( + "tiny", + "small", + "vanilla", + "big", + "dec_15_0", + "decimal_10_10", + "decimal_16_2", + "decimal_29_19"); + + List<DataType> types = + Arrays.asList( + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.DECIMAL(15, 0), + DataTypes.DECIMAL(10, 5), + DataTypes.DECIMAL(16, 2), + DataTypes.DECIMAL(29, 19)); + + List<Object> values = + Arrays.asList( + (byte) 1, + (short) 22, + 3333, + (long) 44444444, + DecimalData.fromBigDecimal(new BigDecimal("555555555555555"), 15, 0), + DecimalData.fromBigDecimal(new BigDecimal("66666.66666"), 10, 5), + DecimalData.fromBigDecimal(new BigDecimal("77777777.17"), 16, 2), + DecimalData.fromBigDecimal( + new BigDecimal("888888888.8888888888888888888"), 29, 19)); + + List<Schema> schemas = + types.stream() + .map( + temporalColumnType -> + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("fav_num", temporalColumnType) + .primaryKey("id") + .build()) + .collect(Collectors.toList()); + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add(new CreateTableEvent(generatedTableId, generatedSchema)); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 1 + i, "Alice", 17, values.get(i)))); + } + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 101 + i, "Zen", 19, values.get(i)))); + } + + return events; + } + BinaryRecordData generate(Schema schema, Object... fields) { return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) .generate( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index e04416db7..6ada032d5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.operators.schema; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; @@ -40,6 +41,7 @@ import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypeFamily; import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; @@ -74,6 +76,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.Serializable; +import java.math.BigDecimal; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; @@ -611,14 +614,42 @@ public class SchemaOperator extends AbstractStreamOperator<Event> } else if (originalField instanceof Integer) { // INT return ((Integer) originalField).longValue(); + } else if (originalField instanceof Long) { + // BIGINT + return originalField; } else { return fail( new IllegalArgumentException( String.format( "Cannot fit type \"%s\" into a BIGINT column. " - + "Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column", + + "Currently only TINYINT / SMALLINT / INT / LONG can be accepted by a BIGINT column", + originalField.getClass()))); + } + } else if (destinationType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) destinationType; + BigDecimal decimalValue; + if (originalField instanceof Byte) { + decimalValue = BigDecimal.valueOf(((Byte) originalField).longValue(), 0); + } else if (originalField instanceof Short) { + decimalValue = BigDecimal.valueOf(((Short) originalField).longValue(), 0); + } else if (originalField instanceof Integer) { + decimalValue = BigDecimal.valueOf(((Integer) originalField).longValue(), 0); + } else if (originalField instanceof Long) { + decimalValue = BigDecimal.valueOf((Long) originalField, 0); + } else if (originalField instanceof DecimalData) { + decimalValue = ((DecimalData) originalField).toBigDecimal(); + } else { + return fail( + new IllegalArgumentException( + String.format( + "Cannot fit type \"%s\" into a DECIMAL column. " + + "Currently only BYTE / SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column", originalField.getClass()))); } + return decimalValue != null + ? DecimalData.fromBigDecimal( + decimalValue, decimalType.getPrecision(), decimalType.getScale()) + : null; } else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { if (originalField instanceof Float) { // FLOAT