This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 89a6d367a [cdc] Fixed constant value of org.apache.kafka prefix being 
shaded (#2821)
89a6d367a is described below

commit 89a6d367acfb1fbec075d28623771334b44e88fe
Author: Kerwin <[email protected]>
AuthorDate: Fri Feb 2 10:43:46 2024 +0800

    [cdc] Fixed constant value of org.apache.kafka prefix being shaded (#2821)
---
 .../cdc/format/debezium/DebeziumSchemaUtils.java   | 80 ++++++++++++++--------
 .../flink/action/cdc/mysql/MySqlRecordParser.java  |  4 +-
 .../action/cdc/postgres/PostgresRecordParser.java  |  6 +-
 3 files changed, 56 insertions(+), 34 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
index 13874872b..d3f1b1196 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
@@ -36,7 +36,6 @@ import io.debezium.time.MicroTime;
 import io.debezium.time.MicroTimestamp;
 import io.debezium.time.Timestamp;
 import io.debezium.time.ZonedTimestamp;
-import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 
 import javax.annotation.Nullable;
@@ -85,7 +84,7 @@ public class DebeziumSchemaUtils {
         } else if (("bytes".equals(debeziumType) && className == null)) {
             // MySQL binary, varbinary, blob
             transformed = new String(Base64.getDecoder().decode(rawValue));
-        } else if ("bytes".equals(debeziumType) && 
Decimal.LOGICAL_NAME.equals(className)) {
+        } else if ("bytes".equals(debeziumType) && 
decimalLogicalName().equals(className)) {
             // MySQL numeric, fixed, decimal
             try {
                 new BigDecimal(rawValue);
@@ -171,36 +170,47 @@ public class DebeziumSchemaUtils {
 
     public static DataType toDataType(
             String debeziumType, @Nullable String className, Map<String, 
String> parameters) {
-        if (className != null) {
-            switch (className) {
-                case Bits.LOGICAL_NAME:
-                    int length = Integer.parseInt(parameters.get("length"));
-                    return DataTypes.BINARY((length + 7) / 8);
-                case Decimal.LOGICAL_NAME:
-                    String precision = 
parameters.get("connect.decimal.precision");
-                    if (precision == null) {
-                        return DataTypes.DECIMAL(20, 0);
-                    }
-
-                    int p = Integer.parseInt(precision);
-                    if (p > DecimalType.MAX_PRECISION) {
-                        return DataTypes.STRING();
-                    } else {
-                        int scale = Integer.parseInt(parameters.get("scale"));
-                        return DataTypes.DECIMAL(p, scale);
-                    }
-                case Date.SCHEMA_NAME:
-                    return DataTypes.DATE();
-                case Timestamp.SCHEMA_NAME:
-                    return DataTypes.TIMESTAMP(3);
-                case MicroTimestamp.SCHEMA_NAME:
-                case ZonedTimestamp.SCHEMA_NAME:
-                    return DataTypes.TIMESTAMP(6);
-                case MicroTime.SCHEMA_NAME:
-                    return DataTypes.TIME();
+        if (className == null) {
+            return fromDebeziumType(debeziumType);
+        }
+
+        if (Bits.LOGICAL_NAME.equals(className)) {
+            int length = Integer.parseInt(parameters.get("length"));
+            return DataTypes.BINARY((length + 7) / 8);
+        }
+
+        if (decimalLogicalName().equals(className)) {
+            String precision = parameters.get("connect.decimal.precision");
+            if (precision == null) {
+                return DataTypes.DECIMAL(20, 0);
+            }
+
+            int p = Integer.parseInt(precision);
+            if (p > DecimalType.MAX_PRECISION) {
+                return DataTypes.STRING();
+            } else {
+                int scale = Integer.parseInt(parameters.get("scale"));
+                return DataTypes.DECIMAL(p, scale);
             }
         }
 
+        if (Date.SCHEMA_NAME.equals(className)) {
+            return DataTypes.DATE();
+        }
+
+        if (Timestamp.SCHEMA_NAME.equals(className)) {
+            return DataTypes.TIMESTAMP(3);
+        }
+
+        if (MicroTimestamp.SCHEMA_NAME.equals(className)
+                || ZonedTimestamp.SCHEMA_NAME.equals(className)) {
+            return DataTypes.TIMESTAMP(6);
+        }
+
+        if (MicroTime.SCHEMA_NAME.equals(className)) {
+            return DataTypes.TIME();
+        }
+
         return fromDebeziumType(debeziumType);
     }
 
@@ -228,4 +238,16 @@ public class DebeziumSchemaUtils {
                 return DataTypes.STRING();
         }
     }
+
+    /**
+     * get decimal logical name.
+     *
+     * <p>Using the maven shade plugin will shade the constant value. see <a
+     * href="https://issues.apache.org/jira/browse/MSHADE-156";>...</a> so the 
string
+     * org.apache.kafka.connect.data.Decimal is shaded to 
org.apache.flink.kafka.shaded
+     * .org.apache.kafka.connect.data.Decimal.
+     */
+    public static String decimalLogicalName() {
+        return "org.apache.#.connect.data.Decimal".replace("#", "kafka");
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 89cb535d1..ee0049fd4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -51,7 +51,6 @@ import io.debezium.time.ZonedTimestamp;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +79,7 @@ import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCase
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
 import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
 
 /**
@@ -276,7 +276,7 @@ public class MySqlRecordParser implements 
FlatMapFunction<String, RichCdcMultipl
             } else if (("bytes".equals(mySqlType) && className == null)) {
                 // MySQL binary, varbinary, blob
                 newValue = new String(Base64.getDecoder().decode(oldValue));
-            } else if ("bytes".equals(mySqlType) && 
Decimal.LOGICAL_NAME.equals(className)) {
+            } else if ("bytes".equals(mySqlType) && 
decimalLogicalName().equals(className)) {
                 // MySQL numeric, fixed, decimal
                 try {
                     new BigDecimal(oldValue);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 3e58dafc1..6a2ef96a1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -48,7 +48,6 @@ import io.debezium.time.ZonedTimestamp;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,6 +74,7 @@ import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCase
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
 import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
 
 /**
@@ -206,7 +206,7 @@ public class PostgresRecordParser implements 
FlatMapFunction<String, RichCdcMult
             case "string":
                 return DataTypes.STRING();
             case "bytes":
-                if (Decimal.LOGICAL_NAME.equals(field.name())) {
+                if (decimalLogicalName().equals(field.name())) {
                     int precision = 
field.parameters().get("connect.decimal.precision").asInt();
                     int scale = field.parameters().get("scale").asInt();
                     return DataTypes.DECIMAL(precision, scale);
@@ -299,7 +299,7 @@ public class PostgresRecordParser implements 
FlatMapFunction<String, RichCdcMult
             } else if (("bytes".equals(postgresSqlType) && className == null)) 
{
                 // binary, varbinary
                 newValue = new String(Base64.getDecoder().decode(oldValue));
-            } else if ("bytes".equals(postgresSqlType) && 
Decimal.LOGICAL_NAME.equals(className)) {
+            } else if ("bytes".equals(postgresSqlType) && 
decimalLogicalName().equals(className)) {
                 // numeric, decimal
                 try {
                     new BigDecimal(oldValue);

Reply via email to