This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.2 by this push:
new 4d0174c79 [FLINK-36565][transform] Route allows merging Decimals with
various precisions (#3743)
4d0174c79 is described below
commit 4d0174c79c55f9236edfdef760edc28cc1729d90
Author: yuxiqian <[email protected]>
AuthorDate: Wed Nov 20 13:57:04 2024 +0800
[FLINK-36565][transform] Route allows merging Decimals with various
precisions (#3743)
---
.../apache/flink/cdc/common/utils/SchemaUtils.java | 38 +++--
.../flink/cdc/common/utils/SchemaUtilsTest.java | 17 +++
.../flink/FlinkPipelineComposerITCase.java | 161 +++++++++++++++++++++
.../runtime/operators/schema/SchemaOperator.java | 33 ++++-
4 files changed, 234 insertions(+), 15 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index eb6bdbad4..d847d0e21 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -214,25 +214,21 @@ public class SchemaUtils {
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(),
rhsDecimal.getScale());
+ Preconditions.checkArgument(
+ resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
+ String.format(
+ "Failed to merge %s and %s type into DECIMAL. %d
precision digits required, %d available",
+ lType,
+ rType,
+ resultIntDigits + resultScale,
+ DecimalType.MAX_PRECISION));
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale,
resultScale);
} else if (lType instanceof DecimalType &&
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
- DecimalType lhsDecimal = (DecimalType) lType;
- mergedType =
- DataTypes.DECIMAL(
- Math.max(
- lhsDecimal.getPrecision(),
- lhsDecimal.getScale() +
getNumericPrecision(rType)),
- lhsDecimal.getScale());
+ mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType,
rType);
} else if (rType instanceof DecimalType &&
lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
- DecimalType rhsDecimal = (DecimalType) rType;
- mergedType =
- DataTypes.DECIMAL(
- Math.max(
- rhsDecimal.getPrecision(),
- rhsDecimal.getScale() +
getNumericPrecision(lType)),
- rhsDecimal.getScale());
+ mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType,
lType);
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"",
lType, rType));
@@ -245,6 +241,20 @@ public class SchemaUtils {
}
}
+ private static DataType mergeExactNumericsIntoDecimal(
+ DecimalType decimalType, DataType otherType) {
+ int resultPrecision =
+ Math.max(
+ decimalType.getPrecision(),
+ decimalType.getScale() +
getNumericPrecision(otherType));
+ Preconditions.checkArgument(
+ resultPrecision <= DecimalType.MAX_PRECISION,
+ String.format(
+ "Failed to merge %s and %s type into DECIMAL. %d
precision digits required, %d available",
+ decimalType, otherType, resultPrecision,
DecimalType.MAX_PRECISION));
+ return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
+ }
+
@VisibleForTesting
public static int getNumericPrecision(DataType dataType) {
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
index a3aff294b..6a4d86c4e 100644
---
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
@@ -273,6 +273,23 @@ public class SchemaUtilsTest {
DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10,
2)))
.isEqualTo(DataTypes.DECIMAL(12, 4));
+ // Test overflow decimal conversions
+ Assertions.assertThatThrownBy(
+ () ->
+ SchemaUtils.inferWiderType(
+ DataTypes.DECIMAL(5, 5),
DataTypes.DECIMAL(38, 0)))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Failed to merge DECIMAL(5, 5) NOT NULL and
DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38
available");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ SchemaUtils.inferWiderType(
+ DataTypes.DECIMAL(38, 0),
DataTypes.DECIMAL(5, 5)))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Failed to merge DECIMAL(38, 0) NOT NULL and
DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38
available");
+
// Test merging with nullability
Assertions.assertThat(
SchemaUtils.inferWiderType(
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 0c25c17ca..48211e075 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
@@ -61,6 +62,7 @@ import
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -1216,6 +1218,88 @@ class FlinkPipelineComposerITCase {
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
}
+ @ParameterizedTest
+ @EnumSource
+ void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi
sinkApi) throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ List<Event> events = generateDecimalColumnEvents("default_table_");
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.singletonList(
+ new RouteDef(
+
"default_namespace.default_schema.default_table_\\.*",
+
"default_namespace.default_schema.default_everything_merged",
+ null,
+ "Merge all decimal columns with
different precision")),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ String[] expected =
+ Stream.of(
+ "CreateTableEvent{tableId={},
schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT},
primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[1, Alice, 17, 1], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
nameMapping={fav_num=BIGINT}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[2, Alice, 17, 22], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
nameMapping={fav_num=DECIMAL(19, 0)}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
nameMapping={fav_num=DECIMAL(24, 5)}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
nameMapping={fav_num=DECIMAL(38, 19)}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
+ .map(
+ s ->
+ s.replace(
+ "{}",
+
"default_namespace.default_schema.default_everything_merged"))
+ .toArray(String[]::new);
+
+ assertThat(outputEvents).containsExactlyInAnyOrder(expected);
+ }
+
private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
List<Event> events = new ArrayList<>();
@@ -1286,6 +1370,83 @@ class FlinkPipelineComposerITCase {
return events;
}
+ private List<Event> generateDecimalColumnEvents(String tableNamePrefix) {
+ List<Event> events = new ArrayList<>();
+
+ // Initialize schemas
+ List<String> names =
+ Arrays.asList(
+ "tiny",
+ "small",
+ "vanilla",
+ "big",
+ "dec_15_0",
+ "decimal_10_10",
+ "decimal_16_2",
+ "decimal_29_19");
+
+ List<DataType> types =
+ Arrays.asList(
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.DECIMAL(15, 0),
+ DataTypes.DECIMAL(10, 5),
+ DataTypes.DECIMAL(16, 2),
+ DataTypes.DECIMAL(29, 19));
+
+ List<Object> values =
+ Arrays.asList(
+ (byte) 1,
+ (short) 22,
+ 3333,
+ (long) 44444444,
+ DecimalData.fromBigDecimal(new
BigDecimal("555555555555555"), 15, 0),
+ DecimalData.fromBigDecimal(new
BigDecimal("66666.66666"), 10, 5),
+ DecimalData.fromBigDecimal(new
BigDecimal("77777777.17"), 16, 2),
+ DecimalData.fromBigDecimal(
+ new
BigDecimal("888888888.8888888888888888888"), 29, 19));
+
+ List<Schema> schemas =
+ types.stream()
+ .map(
+ temporalColumnType ->
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.INT())
+ .physicalColumn("name",
DataTypes.STRING())
+ .physicalColumn("age",
DataTypes.INT())
+ .physicalColumn("fav_num",
temporalColumnType)
+ .primaryKey("id")
+ .build())
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < names.size(); i++) {
+ TableId generatedTableId =
+ TableId.tableId(
+ "default_namespace", "default_schema",
tableNamePrefix + names.get(i));
+ Schema generatedSchema = schemas.get(i);
+ events.add(new CreateTableEvent(generatedTableId,
generatedSchema));
+ events.add(
+ DataChangeEvent.insertEvent(
+ generatedTableId,
+ generate(generatedSchema, 1 + i, "Alice", 17,
values.get(i))));
+ }
+
+ for (int i = 0; i < names.size(); i++) {
+ TableId generatedTableId =
+ TableId.tableId(
+ "default_namespace", "default_schema",
tableNamePrefix + names.get(i));
+ Schema generatedSchema = schemas.get(i);
+ events.add(
+ DataChangeEvent.insertEvent(
+ generatedTableId,
+ generate(generatedSchema, 101 + i, "Zen", 19,
values.get(i))));
+ }
+
+ return events;
+ }
+
BinaryRecordData generate(Schema schema, Object... fields) {
return (new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 50d4e3192..daadff6f7 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.operators.schema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
@@ -39,6 +40,7 @@ import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
@@ -73,6 +75,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -604,14 +607,42 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
} else if (originalField instanceof Integer) {
// INT
return ((Integer) originalField).longValue();
+ } else if (originalField instanceof Long) {
+ // BIGINT
+ return originalField;
} else {
return fail(
new IllegalArgumentException(
String.format(
"Cannot fit type \"%s\" into a
BIGINT column. "
- + "Currently only TINYINT
/ SMALLINT / INT can be accepted by a BIGINT column",
+ + "Currently only TINYINT
/ SMALLINT / INT / LONG can be accepted by a BIGINT column",
+ originalField.getClass())));
+ }
+ } else if (destinationType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) destinationType;
+ BigDecimal decimalValue;
+ if (originalField instanceof Byte) {
+ decimalValue = BigDecimal.valueOf(((Byte)
originalField).longValue(), 0);
+ } else if (originalField instanceof Short) {
+ decimalValue = BigDecimal.valueOf(((Short)
originalField).longValue(), 0);
+ } else if (originalField instanceof Integer) {
+ decimalValue = BigDecimal.valueOf(((Integer)
originalField).longValue(), 0);
+ } else if (originalField instanceof Long) {
+ decimalValue = BigDecimal.valueOf((Long) originalField, 0);
+ } else if (originalField instanceof DecimalData) {
+ decimalValue = ((DecimalData)
originalField).toBigDecimal();
+ } else {
+ return fail(
+ new IllegalArgumentException(
+ String.format(
+ "Cannot fit type \"%s\" into a
DECIMAL column. "
+ + "Currently only BYTE /
SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column",
originalField.getClass())));
}
+ return decimalValue != null
+ ? DecimalData.fromBigDecimal(
+ decimalValue, decimalType.getPrecision(),
decimalType.getScale())
+ : null;
} else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC))
{
if (originalField instanceof Float) {
// FLOAT