This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 89a6d367a [cdc] Fixed constant value of org.apache.kafka prefix being
shaded (#2821)
89a6d367a is described below
commit 89a6d367acfb1fbec075d28623771334b44e88fe
Author: Kerwin <[email protected]>
AuthorDate: Fri Feb 2 10:43:46 2024 +0800
[cdc] Fixed constant value of org.apache.kafka prefix being shaded (#2821)
---
.../cdc/format/debezium/DebeziumSchemaUtils.java | 80 ++++++++++++++--------
.../flink/action/cdc/mysql/MySqlRecordParser.java | 4 +-
.../action/cdc/postgres/PostgresRecordParser.java | 6 +-
3 files changed, 56 insertions(+), 34 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
index 13874872b..d3f1b1196 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
@@ -36,7 +36,6 @@ import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
-import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.json.JsonConverterConfig;
import javax.annotation.Nullable;
@@ -85,7 +84,7 @@ public class DebeziumSchemaUtils {
} else if (("bytes".equals(debeziumType) && className == null)) {
// MySQL binary, varbinary, blob
transformed = new String(Base64.getDecoder().decode(rawValue));
- } else if ("bytes".equals(debeziumType) &&
Decimal.LOGICAL_NAME.equals(className)) {
+ } else if ("bytes".equals(debeziumType) &&
decimalLogicalName().equals(className)) {
// MySQL numeric, fixed, decimal
try {
new BigDecimal(rawValue);
@@ -171,36 +170,47 @@ public class DebeziumSchemaUtils {
public static DataType toDataType(
String debeziumType, @Nullable String className, Map<String,
String> parameters) {
- if (className != null) {
- switch (className) {
- case Bits.LOGICAL_NAME:
- int length = Integer.parseInt(parameters.get("length"));
- return DataTypes.BINARY((length + 7) / 8);
- case Decimal.LOGICAL_NAME:
- String precision =
parameters.get("connect.decimal.precision");
- if (precision == null) {
- return DataTypes.DECIMAL(20, 0);
- }
-
- int p = Integer.parseInt(precision);
- if (p > DecimalType.MAX_PRECISION) {
- return DataTypes.STRING();
- } else {
- int scale = Integer.parseInt(parameters.get("scale"));
- return DataTypes.DECIMAL(p, scale);
- }
- case Date.SCHEMA_NAME:
- return DataTypes.DATE();
- case Timestamp.SCHEMA_NAME:
- return DataTypes.TIMESTAMP(3);
- case MicroTimestamp.SCHEMA_NAME:
- case ZonedTimestamp.SCHEMA_NAME:
- return DataTypes.TIMESTAMP(6);
- case MicroTime.SCHEMA_NAME:
- return DataTypes.TIME();
+ if (className == null) {
+ return fromDebeziumType(debeziumType);
+ }
+
+ if (Bits.LOGICAL_NAME.equals(className)) {
+ int length = Integer.parseInt(parameters.get("length"));
+ return DataTypes.BINARY((length + 7) / 8);
+ }
+
+ if (decimalLogicalName().equals(className)) {
+ String precision = parameters.get("connect.decimal.precision");
+ if (precision == null) {
+ return DataTypes.DECIMAL(20, 0);
+ }
+
+ int p = Integer.parseInt(precision);
+ if (p > DecimalType.MAX_PRECISION) {
+ return DataTypes.STRING();
+ } else {
+ int scale = Integer.parseInt(parameters.get("scale"));
+ return DataTypes.DECIMAL(p, scale);
}
}
+ if (Date.SCHEMA_NAME.equals(className)) {
+ return DataTypes.DATE();
+ }
+
+ if (Timestamp.SCHEMA_NAME.equals(className)) {
+ return DataTypes.TIMESTAMP(3);
+ }
+
+ if (MicroTimestamp.SCHEMA_NAME.equals(className)
+ || ZonedTimestamp.SCHEMA_NAME.equals(className)) {
+ return DataTypes.TIMESTAMP(6);
+ }
+
+ if (MicroTime.SCHEMA_NAME.equals(className)) {
+ return DataTypes.TIME();
+ }
+
return fromDebeziumType(debeziumType);
}
@@ -228,4 +238,16 @@ public class DebeziumSchemaUtils {
return DataTypes.STRING();
}
}
+
+ /**
+ * get decimal logical name.
+ *
+ * <p>Using the maven shade plugin will shade the constant value. see <a
+ * href="https://issues.apache.org/jira/browse/MSHADE-156">...</a> so the
string
+ * org.apache.kafka.connect.data.Decimal is shaded to
org.apache.flink.kafka.shaded
+ * .org.apache.kafka.connect.data.Decimal.
+ */
+ public static String decimalLogicalName() {
+ return "org.apache.#.connect.data.Decimal".replace("#", "kafka");
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 89cb535d1..ee0049fd4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -51,7 +51,6 @@ import io.debezium.time.ZonedTimestamp;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +79,7 @@ import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCase
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
/**
@@ -276,7 +276,7 @@ public class MySqlRecordParser implements
FlatMapFunction<String, RichCdcMultipl
} else if (("bytes".equals(mySqlType) && className == null)) {
// MySQL binary, varbinary, blob
newValue = new String(Base64.getDecoder().decode(oldValue));
- } else if ("bytes".equals(mySqlType) &&
Decimal.LOGICAL_NAME.equals(className)) {
+ } else if ("bytes".equals(mySqlType) &&
decimalLogicalName().equals(className)) {
// MySQL numeric, fixed, decimal
try {
new BigDecimal(oldValue);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 3e58dafc1..6a2ef96a1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -48,7 +48,6 @@ import io.debezium.time.ZonedTimestamp;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,6 +74,7 @@ import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCase
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
/**
@@ -206,7 +206,7 @@ public class PostgresRecordParser implements
FlatMapFunction<String, RichCdcMult
case "string":
return DataTypes.STRING();
case "bytes":
- if (Decimal.LOGICAL_NAME.equals(field.name())) {
+ if (decimalLogicalName().equals(field.name())) {
int precision =
field.parameters().get("connect.decimal.precision").asInt();
int scale = field.parameters().get("scale").asInt();
return DataTypes.DECIMAL(precision, scale);
@@ -299,7 +299,7 @@ public class PostgresRecordParser implements
FlatMapFunction<String, RichCdcMult
} else if (("bytes".equals(postgresSqlType) && className == null))
{
// binary, varbinary
newValue = new String(Base64.getDecoder().decode(oldValue));
- } else if ("bytes".equals(postgresSqlType) &&
Decimal.LOGICAL_NAME.equals(className)) {
+ } else if ("bytes".equals(postgresSqlType) &&
decimalLogicalName().equals(className)) {
// numeric, decimal
try {
new BigDecimal(oldValue);