This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 0108d0e5d [FLINK-35259][cdc][transform] Fix FlinkCDC pipeline transform can't deal timestamp field (#3278) 0108d0e5d is described below commit 0108d0e5d18c55873c3d67e9caad58b9d2148d6a Author: Wink <32723967+aiwe...@users.noreply.github.com> AuthorDate: Mon Apr 29 11:33:04 2024 +0800 [FLINK-35259][cdc][transform] Fix FlinkCDC pipeline transform can't deal timestamp field (#3278) --- .../cdc/runtime/functions/SystemFunctionUtils.java | 55 +++++++++++++++++----- .../cdc/runtime/typeutils/DataTypeConverter.java | 4 ++ .../transform/TransformDataOperatorTest.java | 2 +- .../cdc/runtime/parser/JaninoCompilerTest.java | 6 ++- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 03128e399..fe4df0ba9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -17,6 +17,9 @@ package org.apache.flink.cdc.runtime.functions; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.utils.DateTimeUtils; import org.slf4j.Logger; @@ -42,8 +45,8 @@ public class SystemFunctionUtils { return DateTimeUtils.timestampMillisToTime(epochTime); } - public static long localtimestamp(long epochTime, String timezone) { - return epochTime; + public static TimestampData localtimestamp(long epochTime, String timezone) { + return TimestampData.fromMillis(epochTime); } // synonym: localtime @@ -55,18 +58,28 @@ public class SystemFunctionUtils { return DateTimeUtils.timestampMillisToDate(epochTime); } - public static long currentTimestamp(long epochTime, String timezone) { - return epochTime + TimeZone.getTimeZone(timezone).getOffset(epochTime); + public static TimestampData currentTimestamp(long epochTime, String timezone) { + return TimestampData.fromMillis( + epochTime + TimeZone.getTimeZone(timezone).getOffset(epochTime)); } - // synonym: currentTimestamp - public static long now(long epochTime, String timezone) { - return currentTimestamp(epochTime, timezone); + public static LocalZonedTimestampData now(long epochTime, String timezone) { + return LocalZonedTimestampData.fromEpochMillis(epochTime); } - public static String dateFormat(long timestamp, String format) { + public static String dateFormat(LocalZonedTimestampData timestamp, String format) { SimpleDateFormat dateFormat = new SimpleDateFormat(format); - return dateFormat.format(new Date(timestamp)); + return dateFormat.format(new Date(timestamp.getEpochMillisecond())); + } + + public static String dateFormat(TimestampData timestamp, String format) { + SimpleDateFormat dateFormat = new SimpleDateFormat(format); + return dateFormat.format(new Date(timestamp.getMillisecond())); + } + + public static String dateFormat(ZonedTimestampData timestamp, String format) { + SimpleDateFormat dateFormat = new SimpleDateFormat(format); + return dateFormat.format(new Date(timestamp.getMillisecond())); } public static int toDate(String str) { @@ -77,20 +90,38 @@ public class SystemFunctionUtils { return DateTimeUtils.parseDate(str, format); } - public static long toTimestamp(String str) { + public static TimestampData toTimestamp(String str) { return toTimestamp(str, "yyyy-MM-dd HH:mm:ss"); } - public static long toTimestamp(String str, String format) { + public static TimestampData toTimestamp(String str, String format) { SimpleDateFormat dateFormat = new SimpleDateFormat(format); try { - return dateFormat.parse(str).getTime(); + return TimestampData.fromMillis(dateFormat.parse(str).getTime()); } catch (ParseException e) { LOG.error("Unsupported date type convert: {}", str); throw new RuntimeException(e); } } + public static int timestampDiff( + String symbol, + LocalZonedTimestampData fromTimestamp, + LocalZonedTimestampData toTimestamp) { + return timestampDiff( + symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getEpochMillisecond()); + } + + public static int timestampDiff( + String symbol, TimestampData fromTimestamp, TimestampData toTimestamp) { + return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); + } + + public static int timestampDiff( + String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) { + return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); + } + public static int timestampDiff(String symbol, long fromDate, long toDate) { Calendar from = Calendar.getInstance(); from.setTime(new Date(fromDate)); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java index 6a5979f07..78f7d448b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java @@ -451,6 +451,10 @@ public class DataTypeConverter { // TIMESTAMP_LTZ type is encoded in string type Instant instant = Instant.parse(str); return LocalZonedTimestampData.fromInstant(instant); + } else if (obj instanceof Long) { + return LocalZonedTimestampData.fromEpochMillis((Long) obj); + } else if (obj instanceof LocalZonedTimestampData) { + return obj; } throw new IllegalArgumentException( "Unable to convert to TIMESTAMP_LTZ from unexpected value '" diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java index 5f6d5e921..c3b720071 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java @@ -465,7 +465,7 @@ public class TransformDataOperatorTest { .addTransform( TIMESTAMP_TABLEID.identifier(), "col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as time_equal," - + " IF(LOCALTIMESTAMP = CURRENT_TIMESTAMP and NOW() = LOCALTIMESTAMP, 1, 0) as timestamp_equal," + + " IF(LOCALTIMESTAMP = CURRENT_TIMESTAMP, 1, 0) as timestamp_equal," + " IF(TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) = CURRENT_DATE, 1, 0) as date_equal", "LOCALTIMESTAMP = CURRENT_TIMESTAMP") .addTimezone("GMT") diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java index 5609bc246..db86783d2 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.runtime.parser; +import org.apache.flink.cdc.common.data.TimestampData; + import org.codehaus.commons.compiler.CompileException; import org.codehaus.commons.compiler.Location; import org.codehaus.janino.ExpressionEvaluator; @@ -119,9 +121,9 @@ public class JaninoCompilerTest { JaninoCompiler.loadSystemFunction(expression), columnNames, paramTypes, - Long.class); + TimestampData.class); Object evaluate = expressionEvaluator.evaluate(params.toArray()); - Assert.assertEquals(localTime, evaluate); + Assert.assertEquals(TimestampData.fromMillis(localTime), evaluate); } @Test