This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0108d0e5d [FLINK-35259][cdc][transform] Fix FlinkCDC pipeline 
transform can't deal timestamp field (#3278)
0108d0e5d is described below

commit 0108d0e5d18c55873c3d67e9caad58b9d2148d6a
Author: Wink <32723967+aiwe...@users.noreply.github.com>
AuthorDate: Mon Apr 29 11:33:04 2024 +0800

    [FLINK-35259][cdc][transform] Fix FlinkCDC pipeline transform can't deal 
timestamp field (#3278)
---
 .../cdc/runtime/functions/SystemFunctionUtils.java | 55 +++++++++++++++++-----
 .../cdc/runtime/typeutils/DataTypeConverter.java   |  4 ++
 .../transform/TransformDataOperatorTest.java       |  2 +-
 .../cdc/runtime/parser/JaninoCompilerTest.java     |  6 ++-
 4 files changed, 52 insertions(+), 15 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 03128e399..fe4df0ba9 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.cdc.runtime.functions;
 
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.utils.DateTimeUtils;
 
 import org.slf4j.Logger;
@@ -42,8 +45,8 @@ public class SystemFunctionUtils {
         return DateTimeUtils.timestampMillisToTime(epochTime);
     }
 
-    public static long localtimestamp(long epochTime, String timezone) {
-        return epochTime;
+    public static TimestampData localtimestamp(long epochTime, String 
timezone) {
+        return TimestampData.fromMillis(epochTime);
     }
 
     // synonym: localtime
@@ -55,18 +58,28 @@ public class SystemFunctionUtils {
         return DateTimeUtils.timestampMillisToDate(epochTime);
     }
 
-    public static long currentTimestamp(long epochTime, String timezone) {
-        return epochTime + TimeZone.getTimeZone(timezone).getOffset(epochTime);
+    public static TimestampData currentTimestamp(long epochTime, String 
timezone) {
+        return TimestampData.fromMillis(
+                epochTime + 
TimeZone.getTimeZone(timezone).getOffset(epochTime));
     }
 
-    // synonym: currentTimestamp
-    public static long now(long epochTime, String timezone) {
-        return currentTimestamp(epochTime, timezone);
+    public static LocalZonedTimestampData now(long epochTime, String timezone) 
{
+        return LocalZonedTimestampData.fromEpochMillis(epochTime);
     }
 
-    public static String dateFormat(long timestamp, String format) {
+    public static String dateFormat(LocalZonedTimestampData timestamp, String 
format) {
         SimpleDateFormat dateFormat = new SimpleDateFormat(format);
-        return dateFormat.format(new Date(timestamp));
+        return dateFormat.format(new Date(timestamp.getEpochMillisecond()));
+    }
+
+    public static String dateFormat(TimestampData timestamp, String format) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+        return dateFormat.format(new Date(timestamp.getMillisecond()));
+    }
+
+    public static String dateFormat(ZonedTimestampData timestamp, String 
format) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+        return dateFormat.format(new Date(timestamp.getMillisecond()));
     }
 
     public static int toDate(String str) {
@@ -77,20 +90,38 @@ public class SystemFunctionUtils {
         return DateTimeUtils.parseDate(str, format);
     }
 
-    public static long toTimestamp(String str) {
+    public static TimestampData toTimestamp(String str) {
         return toTimestamp(str, "yyyy-MM-dd HH:mm:ss");
     }
 
-    public static long toTimestamp(String str, String format) {
+    public static TimestampData toTimestamp(String str, String format) {
         SimpleDateFormat dateFormat = new SimpleDateFormat(format);
         try {
-            return dateFormat.parse(str).getTime();
+            return TimestampData.fromMillis(dateFormat.parse(str).getTime());
         } catch (ParseException e) {
             LOG.error("Unsupported date type convert: {}", str);
             throw new RuntimeException(e);
         }
     }
 
+    public static int timestampDiff(
+            String symbol,
+            LocalZonedTimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp) {
+        return timestampDiff(
+                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getEpochMillisecond());
+    }
+
+    public static int timestampDiff(
+            String symbol, TimestampData fromTimestamp, TimestampData 
toTimestamp) {
+        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    }
+
+    public static int timestampDiff(
+            String symbol, ZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
+        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    }
+
     public static int timestampDiff(String symbol, long fromDate, long toDate) 
{
         Calendar from = Calendar.getInstance();
         from.setTime(new Date(fromDate));
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
index 6a5979f07..78f7d448b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
@@ -451,6 +451,10 @@ public class DataTypeConverter {
             // TIMESTAMP_LTZ type is encoded in string type
             Instant instant = Instant.parse(str);
             return LocalZonedTimestampData.fromInstant(instant);
+        } else if (obj instanceof Long) {
+            return LocalZonedTimestampData.fromEpochMillis((Long) obj);
+        } else if (obj instanceof LocalZonedTimestampData) {
+            return obj;
         }
         throw new IllegalArgumentException(
                 "Unable to convert to TIMESTAMP_LTZ from unexpected value '"
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
index 5f6d5e921..c3b720071 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
@@ -465,7 +465,7 @@ public class TransformDataOperatorTest {
                         .addTransform(
                                 TIMESTAMP_TABLEID.identifier(),
                                 "col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as 
time_equal,"
-                                        + " IF(LOCALTIMESTAMP = 
CURRENT_TIMESTAMP and NOW() = LOCALTIMESTAMP, 1, 0) as timestamp_equal,"
+                                        + " IF(LOCALTIMESTAMP = 
CURRENT_TIMESTAMP, 1, 0) as timestamp_equal,"
                                         + " 
IF(TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) = CURRENT_DATE, 1, 0) as 
date_equal",
                                 "LOCALTIMESTAMP = CURRENT_TIMESTAMP")
                         .addTimezone("GMT")
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
index 5609bc246..db86783d2 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.cdc.runtime.parser;
 
+import org.apache.flink.cdc.common.data.TimestampData;
+
 import org.codehaus.commons.compiler.CompileException;
 import org.codehaus.commons.compiler.Location;
 import org.codehaus.janino.ExpressionEvaluator;
@@ -119,9 +121,9 @@ public class JaninoCompilerTest {
                         JaninoCompiler.loadSystemFunction(expression),
                         columnNames,
                         paramTypes,
-                        Long.class);
+                        TimestampData.class);
         Object evaluate = expressionEvaluator.evaluate(params.toArray());
-        Assert.assertEquals(localTime, evaluate);
+        Assert.assertEquals(TimestampData.fromMillis(localTime), evaluate);
     }
 
     @Test

Reply via email to