This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ca11359d [FLINK-35743][cdc-runtime] Correct the temporal function 
semantics
7ca11359d is described below

commit 7ca11359d7969cd7488fc546dd04f1bbfb8d6674
Author: Wink <[email protected]>
AuthorDate: Wed Aug 7 00:49:22 2024 +0800

    [FLINK-35743][cdc-runtime] Correct the temporal function semantics
    
    This closes #3449.
    
    Co-authored-by: wenmo <[email protected]>
    Co-authored-by: yuxiqian <[email protected]>
---
 .../flink/cdc/common/utils/DateTimeUtils.java      |  19 ++
 .../flink/FlinkPipelineComposerITCase.java         |   1 +
 .../flink/FlinkPipelineTransformITCase.java        | 281 +++++++++++++++++++++
 .../cdc/connectors/values/ValuesDatabase.java      |  20 +-
 .../cdc/pipeline/tests/TransformE2eITCase.java     | 123 +++++++++
 .../cdc/runtime/functions/SystemFunctionUtils.java | 118 ++++++---
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |  81 +++++-
 .../parser/metadata/TransformSqlOperatorTable.java |   6 +-
 .../transform/TransformDataOperatorTest.java       | 219 +++++++++++++---
 .../cdc/runtime/parser/JaninoCompilerTest.java     |  21 --
 .../cdc/runtime/parser/TransformParserTest.java    |  14 +-
 11 files changed, 793 insertions(+), 110 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
index 9a009df62..b923107e2 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
@@ -92,6 +92,14 @@ public class DateTimeUtils {
         return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), 
zdt.getDayOfMonth());
     }
 
+    public static int parseDate(String dateStr, String fromFormat, String 
timezone) {
+        long ts = internalParseTimestampMillis(dateStr, fromFormat, 
TimeZone.getTimeZone(timezone));
+        ZoneId zoneId = ZoneId.of(timezone);
+        Instant instant = Instant.ofEpochMilli(ts);
+        ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, zoneId);
+        return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), 
zdt.getDayOfMonth());
+    }
+
     private static long internalParseTimestampMillis(String dateStr, String 
format, TimeZone tz) {
         SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
         formatter.setTimeZone(tz);
@@ -119,4 +127,15 @@ public class DateTimeUtils {
         int m = month + 12 * a - 3;
         return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 
32045;
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format
+    // 
--------------------------------------------------------------------------------------------
+
+    public static String formatTimestampMillis(long ts, String format, 
TimeZone timeZone) {
+        SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
+        formatter.setTimeZone(timeZone);
+        Date dateTime = new Date(ts);
+        return formatter.format(dateTime);
+    }
 }
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 26c9c9187..a8acf7989 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -68,6 +68,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Integration test for {@link FlinkPipelineComposer}. */
 class FlinkPipelineComposerITCase {
+
     private static final int MAX_PARALLELISM = 4;
 
     // Always use parent-first classloader for CDC classes.
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
new file mode 100644
index 000000000..77f89a37a
--- /dev/null
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.definition.TransformDef;
+import org.apache.flink.cdc.connectors.values.ValuesDatabase;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for {@link FlinkPipelineComposer}. */
+class FlinkPipelineTransformITCase {
+
+    private static final int MAX_PARALLELISM = 4;
+
+    // Always use parent-first classloader for CDC classes.
+    // The reason is that ValuesDatabase uses static field for holding data, 
we need to make sure
+    // the class is loaded by AppClassloader so that we can verify data in the 
test case.
+    private static final org.apache.flink.configuration.Configuration 
MINI_CLUSTER_CONFIG =
+            new org.apache.flink.configuration.Configuration();
+
+    static {
+        MINI_CLUSTER_CONFIG.set(
+                ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+                Collections.singletonList("org.apache.flink.cdc"));
+    }
+
+    /**
+     * Use {@link MiniClusterExtension} to reduce the overhead of restarting 
the MiniCluster for
+     * every test case.
+     */
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(MAX_PARALLELISM)
+                            .setConfiguration(MINI_CLUSTER_CONFIG)
+                            .build());
+
+    private final PrintStream standardOut = System.out;
+    private final ByteArrayOutputStream outCaptor = new 
ByteArrayOutputStream();
+
+    @BeforeEach
+    void init() {
+        // Take over STDOUT as we need to check the output of values sink
+        System.setOut(new PrintStream(outCaptor));
+        // Initialize in-memory database
+        ValuesDatabase.clear();
+    }
+
+    @AfterEach
+    void cleanup() {
+        System.setOut(standardOut);
+    }
+
+    @Test
+    void testTransformWithTemporalFunction() throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId myTable1 = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        TableId myTable2 = TableId.tableId("default_namespace", 
"default_schema", "mytable2");
+        Schema table1Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .physicalColumn("age", DataTypes.INT())
+                        .primaryKey("id")
+                        .build();
+        Schema table2Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT())
+                        .physicalColumn("name", DataTypes.VARCHAR(255))
+                        .physicalColumn("age", DataTypes.TINYINT())
+                        .physicalColumn("description", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        List<Event> events = new ArrayList<>();
+        BinaryRecordDataGenerator table1dataGenerator =
+                new BinaryRecordDataGenerator(
+                        table1Schema.getColumnDataTypes().toArray(new 
DataType[0]));
+        BinaryRecordDataGenerator table2dataGenerator =
+                new BinaryRecordDataGenerator(
+                        table2Schema.getColumnDataTypes().toArray(new 
DataType[0]));
+        events.add(new CreateTableEvent(myTable1, table1Schema));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {1, 
BinaryStringData.fromString("Alice"), 18})));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 20})));
+        events.add(
+                DataChangeEvent.updateEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 20}),
+                        table1dataGenerator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 30})));
+        events.add(new CreateTableEvent(myTable2, table2Schema));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable2,
+                        table2dataGenerator.generate(
+                                new Object[] {
+                                    3L,
+                                    BinaryStringData.fromString("Carol"),
+                                    (byte) 15,
+                                    BinaryStringData.fromString("student")
+                                })));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable2,
+                        table2dataGenerator.generate(
+                                new Object[] {
+                                    4L,
+                                    BinaryStringData.fromString("Derrida"),
+                                    (byte) 25,
+                                    BinaryStringData.fromString("student")
+                                })));
+        events.add(
+                DataChangeEvent.deleteEvent(
+                        myTable2,
+                        table2dataGenerator.generate(
+                                new Object[] {
+                                    4L,
+                                    BinaryStringData.fromString("Derrida"),
+                                    (byte) 25,
+                                    BinaryStringData.fromString("student")
+                                })));
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, 
"America/Los_Angeles");
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                new TransformDef(
+                                        
"default_namespace.default_schema.\\.*",
+                                        "*, LOCALTIME as lcl_t, CURRENT_TIME 
as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS 
TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt",
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null)),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        Arrays.stream(outputEvents).forEach(this::extractDataLines);
+    }
+
+    void extractDataLines(String line) {
+        if (!line.startsWith("DataChangeEvent{")) {
+            return;
+        }
+        Stream.of("before", "after")
+                .forEach(
+                        tag -> {
+                            String[] arr = line.split(tag + "=\\[", 2);
+                            String dataRecord = arr[arr.length - 1].split("]", 
2)[0];
+                            if (!dataRecord.isEmpty()) {
+                                verifyDataRecord(dataRecord);
+                            }
+                        });
+    }
+
+    void verifyDataRecord(String recordLine) {
+        List<String> tokens = Arrays.asList(recordLine.split(", "));
+        assertThat(tokens).hasSizeGreaterThanOrEqualTo(6);
+
+        tokens = tokens.subList(tokens.size() - 6, tokens.size());
+
+        String localTime = tokens.get(0);
+        String currentTime = tokens.get(1);
+        assertThat(localTime).isEqualTo(currentTime);
+
+        String currentTimestamp = tokens.get(2);
+        String nowTimestamp = tokens.get(3);
+        String localTimestamp = tokens.get(4);
+        
assertThat(currentTimestamp).isEqualTo(nowTimestamp).isEqualTo(localTimestamp);
+
+        Instant instant =
+                LocalDateTime.parse(
+                                currentTimestamp,
+                                
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+                        .toInstant(ZoneOffset.UTC);
+
+        long milliSecondsInOneDay = 24 * 60 * 60 * 1000;
+
+        assertThat(instant.toEpochMilli() % milliSecondsInOneDay)
+                .isEqualTo(Long.parseLong(localTime));
+
+        String currentDate = tokens.get(5);
+
+        assertThat(instant.toEpochMilli() / milliSecondsInOneDay)
+                .isEqualTo(Long.parseLong(currentDate));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
index 36edab6d8..4d80deece 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
@@ -32,6 +32,7 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.common.source.MetadataAccessor;
 import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink;
 import org.apache.flink.cdc.connectors.values.source.ValuesDataSource;
 
@@ -48,6 +49,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -185,7 +187,7 @@ public class ValuesDatabase {
         private final TableId tableId;
 
         // [primaryKeys, [column_name, column_value]]
-        private final Map<String, Map<String, String>> records;
+        private final Map<String, Map<String, RecordData>> records;
 
         private final LinkedList<Column> columns;
 
@@ -210,15 +212,21 @@ public class ValuesDatabase {
         public List<String> getResult() {
             List<String> results = new ArrayList<>();
             synchronized (lock) {
+                List<RecordData.FieldGetter> fieldGetters = 
SchemaUtils.createFieldGetters(columns);
                 records.forEach(
                         (key, record) -> {
                             StringBuilder stringBuilder = new 
StringBuilder(tableId.toString());
                             stringBuilder.append(":");
-                            for (Column column : columns) {
+                            for (int i = 0; i < columns.size(); i++) {
+                                Column column = columns.get(i);
+                                RecordData.FieldGetter fieldGetter = 
fieldGetters.get(i);
                                 stringBuilder
                                         .append(column.getName())
                                         .append("=")
-                                        
.append(record.getOrDefault(column.getName(), ""))
+                                        .append(
+                                                
Optional.ofNullable(record.get(column.getName()))
+                                                        
.map(fieldGetter::getFieldOrNull)
+                                                        .orElse(""))
                                         .append(";");
                             }
                             stringBuilder.deleteCharAt(stringBuilder.length() 
- 1);
@@ -257,9 +265,9 @@ public class ValuesDatabase {
 
         private void insert(RecordData recordData) {
             String primaryKey = buildPrimaryKeyStr(recordData);
-            Map<String, String> record = new HashMap<>();
+            Map<String, RecordData> record = new HashMap<>();
             for (int i = 0; i < recordData.getArity(); i++) {
-                record.put(columns.get(i).getName(), 
recordData.getString(i).toString());
+                record.put(columns.get(i).getName(), recordData);
             }
             records.put(primaryKey, record);
         }
@@ -393,7 +401,7 @@ public class ValuesDatabase {
                                 records.forEach(
                                         (key, record) -> {
                                             if 
(record.containsKey(beforeName)) {
-                                                String value = 
record.get(beforeName);
+                                                RecordData value = 
record.get(beforeName);
                                                 record.remove(beforeName);
                                                 record.put(afterName, value);
                                             }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 2f896d334..3f0999ee1 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
 import 
org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -40,9 +41,14 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
 
 /** E2e tests for the {@link TransformSchemaOperator}. */
 @RunWith(Parameterized.class)
@@ -315,6 +321,44 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
         System.out.println(stdout);
     }
 
+    @Test
+    public void testTemporalFunctions() throws Exception {
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "transform:\n"
+                                + "  - source-table: %s.\\.*\n"
+                                + "    projection: ID, LOCALTIME as lcl_t, 
CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, 
CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as 
cur_dt\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: 1\n"
+                                + "  local-time-zone: America/Los_Angeles",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        transformRenameDatabase.getDatabaseName(),
+                        transformRenameDatabase.getDatabaseName());
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        waitForTemporaryRecords(8, 60000L);
+    }
+
     private void validateResult(List<String> expectedEvents) throws Exception {
         for (String event : expectedEvents) {
             waitUntilSpecificEvent(event, 6000L);
@@ -340,4 +384,83 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                             + taskManagerConsumer.toUtf8String());
         }
     }
+
+    private int validateTemporaryRecords() {
+        int validRecordCount = 0;
+        for (String line : taskManagerConsumer.toUtf8String().split("\n")) {
+            if (extractDataLines(line)) {
+                validRecordCount++;
+            }
+        }
+        return validRecordCount;
+    }
+
+    private void waitForTemporaryRecords(int expectedRecords, long timeout) 
throws Exception {
+        boolean result = false;
+        long endTimeout = System.currentTimeMillis() + timeout;
+        while (System.currentTimeMillis() < endTimeout) {
+            if (validateTemporaryRecords() >= expectedRecords) {
+                result = true;
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        if (!result) {
+            throw new TimeoutException(
+                    "failed to get enough temporary records: "
+                            + expectedRecords
+                            + " from stdout: "
+                            + taskManagerConsumer.toUtf8String());
+        }
+    }
+
+    boolean extractDataLines(String line) {
+        if (!line.startsWith("DataChangeEvent{")) {
+            return false;
+        }
+        Stream.of("before", "after")
+                .forEach(
+                        tag -> {
+                            String[] arr = line.split(tag + "=\\[", 2);
+                            String dataRecord = arr[arr.length - 1].split("]", 
2)[0];
+                            if (!dataRecord.isEmpty()) {
+                                verifyDataRecord(dataRecord);
+                            }
+                        });
+        return true;
+    }
+
+    void verifyDataRecord(String recordLine) {
+        LOG.info("Verifying data line {}", recordLine);
+        List<String> tokens = Arrays.asList(recordLine.split(", "));
+        Assert.assertTrue(tokens.size() >= 6);
+
+        tokens = tokens.subList(tokens.size() - 6, tokens.size());
+
+        String localTime = tokens.get(0);
+        String currentTime = tokens.get(1);
+        Assert.assertEquals(localTime, currentTime);
+
+        String currentTimestamp = tokens.get(2);
+        String nowTimestamp = tokens.get(3);
+        String localTimestamp = tokens.get(4);
+        Assert.assertEquals(currentTimestamp, nowTimestamp);
+        Assert.assertEquals(currentTimestamp, localTimestamp);
+
+        Instant instant =
+                LocalDateTime.parse(
+                                currentTimestamp,
+                                
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+                        .toInstant(ZoneOffset.UTC);
+
+        long milliSecondsInOneDay = 24 * 60 * 60 * 1000;
+
+        Assert.assertEquals(
+                instant.toEpochMilli() % milliSecondsInOneDay, 
Long.parseLong(localTime));
+
+        String currentDate = tokens.get(5);
+
+        Assert.assertEquals(
+                instant.toEpochMilli() / milliSecondsInOneDay, 
Long.parseLong(currentDate));
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 34c299567..ba569fc08 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -30,6 +30,9 @@ import java.math.MathContext;
 import java.math.RoundingMode;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
@@ -38,65 +41,62 @@ import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToDate;
+import static 
org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToTime;
+
 /** System function utils to support the call of flink cdc pipeline transform. 
*/
 public class SystemFunctionUtils {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SystemFunctionUtils.class);
 
-    public static int localtime(long epochTime, String timezone) {
-        return DateTimeUtils.timestampMillisToTime(epochTime);
-    }
+    private static final Logger LOG = 
LoggerFactory.getLogger(SystemFunctionUtils.class);
 
-    public static TimestampData localtimestamp(long epochTime, String 
timezone) {
-        return TimestampData.fromMillis(epochTime);
+    public static LocalZonedTimestampData currentTimestamp(long epochTime) {
+        return LocalZonedTimestampData.fromEpochMillis(epochTime);
     }
 
-    // synonym: localtime
-    public static int currentTime(long epochTime, String timezone) {
-        return localtime(epochTime, timezone);
+    // synonym with currentTimestamp
+    public static LocalZonedTimestampData now(long epochTime) {
+        return LocalZonedTimestampData.fromEpochMillis(epochTime);
     }
 
-    public static int currentDate(long epochTime, String timezone) {
-        return DateTimeUtils.timestampMillisToDate(epochTime);
+    public static TimestampData localtimestamp(long epochTime, String 
timezone) {
+        return TimestampData.fromLocalDateTime(
+                
Instant.ofEpochMilli(epochTime).atZone(ZoneId.of(timezone)).toLocalDateTime());
     }
 
-    public static TimestampData currentTimestamp(long epochTime, String 
timezone) {
-        return TimestampData.fromMillis(
-                epochTime + 
TimeZone.getTimeZone(timezone).getOffset(epochTime));
+    public static int localtime(long epochTime, String timezone) {
+        return timestampMillisToTime(localtimestamp(epochTime, 
timezone).getMillisecond());
     }
 
-    public static LocalZonedTimestampData now(long epochTime, String timezone) 
{
-        return LocalZonedTimestampData.fromEpochMillis(epochTime);
+    public static int currentTime(long epochTime, String timezone) {
+        // the time value of currentTimestamp under given session time zone
+        return timestampMillisToTime(localtimestamp(epochTime, 
timezone).getMillisecond());
     }
 
-    public static String dateFormat(LocalZonedTimestampData timestamp, String 
format) {
-        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
-        return dateFormat.format(new Date(timestamp.getEpochMillisecond()));
+    public static int currentDate(long epochTime, String timezone) {
+        // the date value of currentTimestamp under given session time zone
+        return timestampMillisToDate(localtimestamp(epochTime, 
timezone).getMillisecond());
     }
 
     public static String dateFormat(TimestampData timestamp, String format) {
-        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
-        return dateFormat.format(new Date(timestamp.getMillisecond()));
-    }
-
-    public static String dateFormat(ZonedTimestampData timestamp, String 
format) {
-        SimpleDateFormat dateFormat = new SimpleDateFormat(format);
-        return dateFormat.format(new Date(timestamp.getMillisecond()));
+        return DateTimeUtils.formatTimestampMillis(
+                timestamp.getMillisecond(), format, 
TimeZone.getTimeZone("UTC"));
     }
 
-    public static int toDate(String str) {
-        return toDate(str, "yyyy-MM-dd");
+    public static int toDate(String str, String timezone) {
+        return toDate(str, "yyyy-MM-dd", timezone);
     }
 
-    public static int toDate(String str, String format) {
-        return DateTimeUtils.parseDate(str, format);
+    public static int toDate(String str, String format, String timezone) {
+        return DateTimeUtils.parseDate(str, format, timezone);
     }
 
-    public static TimestampData toTimestamp(String str) {
-        return toTimestamp(str, "yyyy-MM-dd HH:mm:ss");
+    public static TimestampData toTimestamp(String str, String timezone) {
+        return toTimestamp(str, "yyyy-MM-dd HH:mm:ss", timezone);
     }
 
-    public static TimestampData toTimestamp(String str, String format) {
+    public static TimestampData toTimestamp(String str, String format, String 
timezone) {
         SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timezone));
         try {
             return TimestampData.fromMillis(dateFormat.parse(str).getTime());
         } catch (ParseException e) {
@@ -118,11 +118,45 @@ public class SystemFunctionUtils {
         return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
     }
 
+    public static int timestampDiff(
+            String symbol, TimestampData fromTimestamp, 
LocalZonedTimestampData toTimestamp) {
+        return timestampDiff(
+                symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getEpochMillisecond());
+    }
+
+    public static int timestampDiff(
+            String symbol, LocalZonedTimestampData fromTimestamp, 
TimestampData toTimestamp) {
+        return timestampDiff(
+                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getMillisecond());
+    }
+
     public static int timestampDiff(
             String symbol, ZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
         return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
     }
 
+    public static int timestampDiff(
+            String symbol, LocalZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
+        return timestampDiff(
+                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getMillisecond());
+    }
+
+    public static int timestampDiff(
+            String symbol, ZonedTimestampData fromTimestamp, 
LocalZonedTimestampData toTimestamp) {
+        return timestampDiff(
+                symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getEpochMillisecond());
+    }
+
+    public static int timestampDiff(
+            String symbol, TimestampData fromTimestamp, ZonedTimestampData 
toTimestamp) {
+        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    }
+
+    public static int timestampDiff(
+            String symbol, ZonedTimestampData fromTimestamp, TimestampData 
toTimestamp) {
+        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    }
+
     public static int timestampDiff(String symbol, long fromDate, long toDate) 
{
         Calendar from = Calendar.getInstance();
         from.setTime(new Date(fromDate));
@@ -587,6 +621,24 @@ public class SystemFunctionUtils {
         return bigDecimal;
     }
 
+    public static TimestampData castToTimestamp(Object object, String 
timezone) {
+        if (object == null) {
+            return null;
+        }
+        if (object instanceof LocalZonedTimestampData) {
+            return TimestampData.fromLocalDateTime(
+                    LocalDateTime.ofInstant(
+                            ((LocalZonedTimestampData) object).toInstant(), 
ZoneId.of(timezone)));
+        } else if (object instanceof ZonedTimestampData) {
+            return TimestampData.fromLocalDateTime(
+                    LocalDateTime.ofInstant(
+                            ((ZonedTimestampData) object).toInstant(), 
ZoneId.of(timezone)));
+        } else {
+            return TimestampData.fromLocalDateTime(
+                    LocalDateTime.parse(castObjectIntoString(object)));
+        }
+    }
+
     private static String castObjectIntoString(Object object) {
         if (object instanceof Boolean) {
             return Boolean.valueOf(castToString(object)) ? "1" : "0";
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 2dd1b8402..4877d0175 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -49,13 +49,18 @@ import java.util.List;
 public class JaninoCompiler {
 
     private static final List<SqlTypeName> SQL_TYPE_NAME_IGNORE = 
Arrays.asList(SqlTypeName.SYMBOL);
-    private static final List<String> NO_OPERAND_TIMESTAMP_FUNCTIONS =
-            Arrays.asList(
-                    "LOCALTIME",
-                    "LOCALTIMESTAMP",
-                    "CURRENT_TIME",
-                    "CURRENT_DATE",
-                    "CURRENT_TIMESTAMP");
+    private static final List<String> TIMEZONE_FREE_TEMPORAL_FUNCTIONS =
+            Arrays.asList("CURRENT_TIMESTAMP", "NOW");
+
+    private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
+            Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", 
"CURRENT_DATE");
+
+    private static final List<String> 
TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
+            Arrays.asList("DATE_FORMAT");
+
+    private static final List<String> 
TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
+            Arrays.asList("TO_DATE", "TO_TIMESTAMP");
+
     public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
     public static final String DEFAULT_TIME_ZONE = "__time_zone__";
 
@@ -107,8 +112,14 @@ public class JaninoCompiler {
 
     private static Java.Rvalue translateSqlIdentifier(SqlIdentifier 
sqlIdentifier) {
         String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() 
- 1);
-        if (NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(columnName)) {
-            return generateNoOperandTimestampFunctionOperation(columnName);
+        if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName)) {
+            return generateTimezoneFreeTemporalFunctionOperation(columnName);
+        } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName)) {
+            return 
generateTimezoneRequiredTemporalFunctionOperation(columnName);
+        } else if 
(TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
+            return 
generateTimezoneFreeTemporalConversionFunctionOperation(columnName);
+        } else if 
(TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
+            return 
generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
         } else {
             return new Java.AmbiguousName(Location.NOWHERE, new String[] 
{columnName});
         }
@@ -135,8 +146,14 @@ public class JaninoCompiler {
         for (SqlNode sqlNode : operandList) {
             translateSqlNodeToAtoms(sqlNode, atoms);
         }
-        if 
(NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) 
{
+        if 
(TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(sqlBasicCall.getOperator().getName()))
 {
             atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_EPOCH_TIME}));
+        } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(
+                sqlBasicCall.getOperator().getName())) {
+            atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_EPOCH_TIME}));
+            atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_TIME_ZONE}));
+        } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
+                sqlBasicCall.getOperator().getName())) {
             atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_TIME_ZONE}));
         }
         return sqlBasicCallToJaninoRvalue(sqlBasicCall, atoms.toArray(new 
Java.Rvalue[0]));
@@ -289,8 +306,6 @@ public class JaninoCompiler {
             } else {
                 throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
             }
-        } else if (operationName.equals("NOW")) {
-            return generateNoOperandTimestampFunctionOperation(operationName);
         } else {
             return new Java.MethodInvocation(
                     Location.NOWHERE,
@@ -300,7 +315,18 @@ public class JaninoCompiler {
         }
     }
 
-    private static Java.Rvalue 
generateNoOperandTimestampFunctionOperation(String operationName) {
+    private static Java.Rvalue 
generateTimezoneFreeTemporalFunctionOperation(String operationName) {
+        return new Java.MethodInvocation(
+                Location.NOWHERE,
+                null,
+                StringUtils.convertToCamelCase(operationName),
+                new Java.Rvalue[] {
+                    new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_EPOCH_TIME})
+                });
+    }
+
+    private static Java.Rvalue 
generateTimezoneRequiredTemporalFunctionOperation(
+            String operationName) {
         List<Java.Rvalue> timestampFunctionParam = new ArrayList<>();
         timestampFunctionParam.add(
                 new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_EPOCH_TIME}));
@@ -313,6 +339,26 @@ public class JaninoCompiler {
                 timestampFunctionParam.toArray(new Java.Rvalue[0]));
     }
 
+    private static Java.Rvalue 
generateTimezoneFreeTemporalConversionFunctionOperation(
+            String operationName) {
+        return new Java.MethodInvocation(
+                Location.NOWHERE,
+                null,
+                StringUtils.convertToCamelCase(operationName),
+                new Java.Rvalue[0]);
+    }
+
+    private static Java.Rvalue 
generateTimezoneRequiredTemporalConversionFunctionOperation(
+            String operationName) {
+        return new Java.MethodInvocation(
+                Location.NOWHERE,
+                null,
+                StringUtils.convertToCamelCase(operationName),
+                new Java.Rvalue[] {
+                    new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_TIME_ZONE})
+                });
+    }
+
     private static Java.Rvalue generateTypeConvertMethod(
             SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
         switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
@@ -359,6 +405,15 @@ public class JaninoCompiler {
             case "VARCHAR":
             case "STRING":
                 return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToString", atoms);
+            case "TIMESTAMP":
+                List<Java.Rvalue> timestampAtoms = new 
ArrayList<>(Arrays.asList(atoms));
+                timestampAtoms.add(
+                        new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_TIME_ZONE}));
+                return new Java.MethodInvocation(
+                        Location.NOWHERE,
+                        null,
+                        "castToTimestamp",
+                        timestampAtoms.toArray(new Java.Rvalue[0]));
             default:
                 throw new ParseException(
                         "Unsupported data type cast: " + 
sqlDataTypeSpec.toString());
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 8ecefa8f4..35253f279 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlCurrentDateFunction;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
@@ -86,11 +87,12 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     .build();
     public static final SqlFunction LOCALTIMESTAMP =
             new BuiltInTimestampFunction("LOCALTIMESTAMP", 
SqlTypeName.TIMESTAMP, 3);
+    public static final SqlFunction CURRENT_TIME =
+            new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0);
     public static final SqlFunction CURRENT_TIMESTAMP =
             new BuiltInTimestampFunction(
                     "CURRENT_TIMESTAMP", 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
-    public static final SqlFunction CURRENT_DATE =
-            new BuiltInTimestampFunction("CURRENT_DATE", SqlTypeName.DATE, 0);
+    public static final SqlFunction CURRENT_DATE = new 
SqlCurrentDateFunction();
     public static final SqlFunction NOW =
             new BuiltInTimestampFunction("NOW", 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
                 @Override
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
index 2213127a4..a19722ef1 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
@@ -110,8 +110,6 @@ public class TransformDataOperatorTest {
                     .physicalColumn("minute_diff", DataTypes.INT())
                     .physicalColumn("hour_diff", DataTypes.INT())
                     .physicalColumn("day_diff", DataTypes.INT())
-                    .physicalColumn("month_diff", DataTypes.INT())
-                    .physicalColumn("year_diff", DataTypes.INT())
                     .primaryKey("col1")
                     .build();
 
@@ -131,6 +129,7 @@ public class TransformDataOperatorTest {
                     .physicalColumn("nullChar", DataTypes.CHAR(1))
                     .physicalColumn("nullVarchar", DataTypes.VARCHAR(1))
                     .physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2))
+                    .physicalColumn("nullTimestamp", DataTypes.TIMESTAMP(3))
                     .primaryKey("col1")
                     .build();
 
@@ -149,6 +148,16 @@ public class TransformDataOperatorTest {
                     .physicalColumn("castChar", DataTypes.CHAR(1))
                     .physicalColumn("castVarchar", DataTypes.VARCHAR(1))
                     .physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2))
+                    .physicalColumn("castTimestamp", DataTypes.TIMESTAMP(3))
+                    .primaryKey("col1")
+                    .build();
+
+    private static final TableId TIMEZONE_TABLEID =
+            TableId.tableId("my_company", "my_branch", "timezone_table");
+    private static final Schema TIMEZONE_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("datetime", DataTypes.STRING())
                     .primaryKey("col1")
                     .build();
 
@@ -502,10 +511,10 @@ public class TransformDataOperatorTest {
                         .addTransform(
                                 TIMESTAMP_TABLEID.identifier(),
                                 "col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as 
time_equal,"
-                                        + " IF(LOCALTIMESTAMP = 
CURRENT_TIMESTAMP, 1, 0) as timestamp_equal,"
+                                        + " 
IF(DATE_FORMAT(CAST(CURRENT_TIMESTAMP AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss') = 
DATE_FORMAT(CAST(NOW() AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss'), 1, 0) as 
timestamp_equal,"
                                         + " 
IF(TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) = CURRENT_DATE, 1, 0) as 
date_equal",
-                                "LOCALTIMESTAMP = CURRENT_TIMESTAMP")
-                        .addTimezone("GMT")
+                                "LOCALTIMESTAMP = CAST(CURRENT_TIMESTAMP AS 
TIMESTAMP)")
+                        .addTimezone("UTC")
                         .build();
         EventOperatorTestHarness<TransformDataOperator, Event>
                 transformFunctionEventEventOperatorTestHarness =
@@ -546,14 +555,19 @@ public class TransformDataOperatorTest {
                 TransformDataOperator.newBuilder()
                         .addTransform(
                                 TIMESTAMPDIFF_TABLEID.identifier(),
-                                "col1, TIMESTAMP_DIFF('SECOND', 
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as second_diff,"
-                                        + " TIMESTAMP_DIFF('MINUTE', 
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as minute_diff,"
-                                        + " TIMESTAMP_DIFF('HOUR', 
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as hour_diff,"
-                                        + " TIMESTAMP_DIFF('DAY', 
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as day_diff,"
-                                        + " TIMESTAMP_DIFF('MONTH', 
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as month_diff,"
-                                        + " TIMESTAMP_DIFF('YEAR', 
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as year_diff",
-                                null)
-                        .addTimezone("GMT-8:00")
+                                "col1, TIMESTAMP_DIFF('SECOND', 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff,"
+                                        + " TIMESTAMP_DIFF('MINUTE', 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff,"
+                                        + " TIMESTAMP_DIFF('HOUR', 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff,"
+                                        + " TIMESTAMP_DIFF('DAY', 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff",
+                                "col1='1'")
+                        .addTransform(
+                                TIMESTAMPDIFF_TABLEID.identifier(),
+                                "col1, TIMESTAMP_DIFF('SECOND', 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as second_diff,"
+                                        + " TIMESTAMP_DIFF('MINUTE', 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff,"
+                                        + " TIMESTAMP_DIFF('HOUR', 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff,"
+                                        + " TIMESTAMP_DIFF('DAY', 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as day_diff",
+                                "col1='2'")
+                        .addTimezone("Asia/Shanghai")
                         .build();
         EventOperatorTestHarness<TransformDataOperator, Event>
                 transformFunctionEventEventOperatorTestHarness =
@@ -570,22 +584,79 @@ public class TransformDataOperatorTest {
                 DataChangeEvent.insertEvent(
                         TIMESTAMPDIFF_TABLEID,
                         recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("1"), null, null, 
null, null, null, null
-                                }));
+                                new Object[] {new BinaryStringData("1"), null, 
null, null, null}));
         DataChangeEvent insertEventExpect =
                 DataChangeEvent.insertEvent(
                         TIMESTAMPDIFF_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("1"), 0, 0, 
0, 0}));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(TIMESTAMPDIFF_TABLEID, 
TIMESTAMPDIFF_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        TIMESTAMPDIFF_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("2"), null, 
null, null, null}));
+        DataChangeEvent insertEventExpect2 =
+                DataChangeEvent.insertEvent(
+                        TIMESTAMPDIFF_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("2"), 0, 0, 
0, 0}));
+
+        transform.processElement(new StreamRecord<>(insertEvent2));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect2));
+    }
+
+    @Test
+    void testTimezoneTransform() throws Exception {
+        TransformDataOperator transform =
+                TransformDataOperator.newBuilder()
+                        .addTransform(
+                                TIMEZONE_TABLEID.identifier(),
+                                "col1, DATE_FORMAT(TO_TIMESTAMP('2024-08-01 
00:00:00'), 'yyyy-MM-dd HH:mm:ss') as datetime",
+                                null)
+                        .addTimezone("UTC")
+                        .build();
+        EventOperatorTestHarness<TransformDataOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent = new 
CreateTableEvent(TIMEZONE_TABLEID, TIMEZONE_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
TIMEZONE_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        TIMEZONE_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("1"), 
null}));
+        DataChangeEvent insertEventExpect =
+                DataChangeEvent.insertEvent(
+                        TIMEZONE_TABLEID,
                         recordDataGenerator.generate(
                                 new Object[] {
-                                    new BinaryStringData("1"), -28800, -480, 
-8, 0, 0, 0
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("2024-08-01 00:00:00")
                                 }));
         transform.processElement(new StreamRecord<>(createTableEvent));
         Assertions.assertThat(
                         
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
                 .isEqualTo(
                         new StreamRecord<>(
-                                new CreateTableEvent(TIMESTAMPDIFF_TABLEID, 
TIMESTAMPDIFF_SCHEMA)));
+                                new CreateTableEvent(TIMEZONE_TABLEID, 
TIMEZONE_SCHEMA)));
         transform.processElement(new StreamRecord<>(insertEvent));
         Assertions.assertThat(
                         
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
@@ -609,7 +680,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(colString as double) as 
nullDouble"
                                         + ",cast(colString as char) as 
nullChar"
                                         + ",cast(colString as varchar) as 
nullVarchar"
-                                        + ",cast(colString as DECIMAL(4,2)) as 
nullDecimal",
+                                        + ",cast(colString as DECIMAL(4,2)) as 
nullDecimal"
+                                        + ",cast(colString as TIMESTAMP(3)) as 
nullTimestamp",
                                 null)
                         .build();
         EventOperatorTestHarness<TransformDataOperator, Event>
@@ -639,6 +711,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(createTableEvent));
         Assertions.assertThat(
@@ -666,7 +739,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(col1 as double) as castDouble"
                                         + ",cast(col1 as char) as castChar"
                                         + ",cast(col1 as varchar) as 
castVarchar"
-                                        + ",cast(col1 as DECIMAL(4,2)) as 
castDecimal",
+                                        + ",cast(col1 as DECIMAL(4,2)) as 
castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '1'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -680,7 +754,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castInt as double) as 
castDouble"
                                         + ",cast(castInt as char) as castChar"
                                         + ",cast(castInt as varchar) as 
castVarchar"
-                                        + ",cast(castInt as DECIMAL(4,2)) as 
castDecimal",
+                                        + ",cast(castInt as DECIMAL(4,2)) as 
castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '2'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -694,7 +769,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castBoolean as double) as 
castDouble"
                                         + ",cast(castBoolean as char) as 
castChar"
                                         + ",cast(castBoolean as varchar) as 
castVarchar"
-                                        + ",cast(castBoolean as DECIMAL(4,2)) 
as castDecimal",
+                                        + ",cast(castBoolean as DECIMAL(4,2)) 
as castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '3'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -708,7 +784,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castTinyint as double) as 
castDouble"
                                         + ",cast(castTinyint as char) as 
castChar"
                                         + ",cast(castTinyint as varchar) as 
castVarchar"
-                                        + ",cast(castTinyint as DECIMAL(4,2)) 
as castDecimal",
+                                        + ",cast(castTinyint as DECIMAL(4,2)) 
as castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '4'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -722,7 +799,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castSmallint as double) as 
castDouble"
                                         + ",cast(castSmallint as char) as 
castChar"
                                         + ",cast(castSmallint as varchar) as 
castVarchar"
-                                        + ",cast(castSmallint as DECIMAL(4,2)) 
as castDecimal",
+                                        + ",cast(castSmallint as DECIMAL(4,2)) 
as castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '5'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -736,7 +814,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castBigint as double) as 
castDouble"
                                         + ",cast(castBigint as char) as 
castChar"
                                         + ",cast(castBigint as varchar) as 
castVarchar"
-                                        + ",cast(castBigint as DECIMAL(4,2)) 
as castDecimal",
+                                        + ",cast(castBigint as DECIMAL(4,2)) 
as castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '6'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -750,7 +829,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castFloat as double) as 
castDouble"
                                         + ",cast(castFloat as char) as 
castChar"
                                         + ",cast(castFloat as varchar) as 
castVarchar"
-                                        + ",cast(castFloat as DECIMAL(4,2)) as 
castDecimal",
+                                        + ",cast(castFloat as DECIMAL(4,2)) as 
castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '7'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -764,7 +844,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castDouble as double) as 
castDouble"
                                         + ",cast(castDouble as char) as 
castChar"
                                         + ",cast(castDouble as varchar) as 
castVarchar"
-                                        + ",cast(castDouble as DECIMAL(4,2)) 
as castDecimal",
+                                        + ",cast(castDouble as DECIMAL(4,2)) 
as castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '8'")
                         .addTransform(
                                 CAST_TABLEID.identifier(),
@@ -778,8 +859,24 @@ public class TransformDataOperatorTest {
                                         + ",cast(castDecimal as double) as 
castDouble"
                                         + ",cast(castDecimal as char) as 
castChar"
                                         + ",cast(castDecimal as varchar) as 
castVarchar"
-                                        + ",cast(castDecimal as DECIMAL(4,2)) 
as castDecimal",
+                                        + ",cast(castDecimal as DECIMAL(4,2)) 
as castDecimal"
+                                        + ", castTimestamp",
                                 "col1 = '9'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",castInt"
+                                        + ",castBoolean"
+                                        + ",castTinyint"
+                                        + ",castSmallint"
+                                        + ",castBigint"
+                                        + ",castFloat"
+                                        + ",castDouble"
+                                        + ",castChar"
+                                        + ",cast(castTimestamp as varchar) as 
castVarchar"
+                                        + ",castDecimal"
+                                        + ",cast('1970-01-01T00:00:01.234' as 
TIMESTAMP(3)) as castTimestamp",
+                                "col1 = '10'")
                         .build();
         EventOperatorTestHarness<TransformDataOperator, Event>
                 transformFunctionEventEventOperatorTestHarness =
@@ -807,6 +904,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect1 =
                 DataChangeEvent.insertEvent(
@@ -824,6 +922,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1"),
                                     new BinaryStringData("1"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(createTableEvent));
         Assertions.assertThat(
@@ -849,6 +948,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect2 =
                 DataChangeEvent.insertEvent(
@@ -866,6 +966,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1"),
                                     new BinaryStringData("1"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent2));
         Assertions.assertThat(
@@ -887,6 +988,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect3 =
                 DataChangeEvent.insertEvent(
@@ -904,6 +1006,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("true"),
                                     new BinaryStringData("true"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent3));
         Assertions.assertThat(
@@ -925,6 +1028,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect4 =
                 DataChangeEvent.insertEvent(
@@ -942,6 +1046,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1"),
                                     new BinaryStringData("1"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent4));
         Assertions.assertThat(
@@ -963,6 +1068,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect5 =
                 DataChangeEvent.insertEvent(
@@ -980,6 +1086,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1"),
                                     new BinaryStringData("1"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent5));
         Assertions.assertThat(
@@ -1001,6 +1108,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect6 =
                 DataChangeEvent.insertEvent(
@@ -1018,6 +1126,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1"),
                                     new BinaryStringData("1"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent6));
         Assertions.assertThat(
@@ -1039,6 +1148,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect7 =
                 DataChangeEvent.insertEvent(
@@ -1056,6 +1166,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1.0"),
                                     new BinaryStringData("1.0"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent7));
         Assertions.assertThat(
@@ -1077,6 +1188,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         DataChangeEvent insertEventExpect8 =
                 DataChangeEvent.insertEvent(
@@ -1094,6 +1206,7 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1.0"),
                                     new BinaryStringData("1.0"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent8));
         Assertions.assertThat(
@@ -1115,6 +1228,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         DataChangeEvent insertEventExpect9 =
                 DataChangeEvent.insertEvent(
@@ -1132,11 +1246,53 @@ public class TransformDataOperatorTest {
                                     new BinaryStringData("1.00"),
                                     new BinaryStringData("1.00"),
                                     DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(insertEvent9));
         Assertions.assertThat(
                         
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
                 .isEqualTo(new StreamRecord<>(insertEventExpect9));
+
+        DataChangeEvent insertEvent10 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("10"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    TimestampData.fromMillis(1234, 0)
+                                }));
+        DataChangeEvent insertEventExpect10 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("10"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    new 
BinaryStringData("1970-01-01T00:00:01.234"),
+                                    null,
+                                    TimestampData.fromMillis(1234, 0)
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent10));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect10));
     }
 
     @Test
@@ -1155,7 +1311,8 @@ public class TransformDataOperatorTest {
                                         + ",cast(castFloat as double) as 
castDouble"
                                         + ",cast(castFloat as char) as 
castChar"
                                         + ",cast(castFloat as varchar) as 
castVarchar"
-                                        + ",cast(castFloat as DECIMAL(4,2)) as 
castDecimal",
+                                        + ",cast(castFloat as DECIMAL(4,2)) as 
castDecimal"
+                                        + ",cast(castFloat as TIMESTAMP) as 
castTimestamp",
                                 "col1 = '1'")
                         .build();
         EventOperatorTestHarness<TransformDataOperator, Event>
@@ -1184,6 +1341,7 @@ public class TransformDataOperatorTest {
                                     null,
                                     null,
                                     null,
+                                    null
                                 }));
         transform.processElement(new StreamRecord<>(createTableEvent));
         Assertions.assertThat(
@@ -1255,6 +1413,7 @@ public class TransformDataOperatorTest {
         testExpressionConditionTransform("cast(null as char) is null");
         testExpressionConditionTransform("cast(null as varchar) is null");
         testExpressionConditionTransform("cast(null as DECIMAL(4,2)) is null");
+        testExpressionConditionTransform("cast(null as TIMESTAMP(3)) is null");
     }
 
     private void testExpressionConditionTransform(String expression) throws 
Exception {
@@ -1264,7 +1423,7 @@ public class TransformDataOperatorTest {
                                 CONDITION_TABLEID.identifier(),
                                 "col1, IF(" + expression + ", true, false) as 
condition_result",
                                 expression)
-                        .addTimezone("GMT")
+                        .addTimezone("UTC")
                         .build();
         EventOperatorTestHarness<TransformDataOperator, Event>
                 transformFunctionEventEventOperatorTestHarness =
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
index db86783d2..6fbaef3bf 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.cdc.runtime.parser;
 
-import org.apache.flink.cdc.common.data.TimestampData;
-
 import org.codehaus.commons.compiler.CompileException;
 import org.codehaus.commons.compiler.Location;
 import org.codehaus.janino.ExpressionEvaluator;
@@ -36,7 +34,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.TimeZone;
 
 /** Unit tests for the {@link JaninoCompiler}. */
 public class JaninoCompilerTest {
@@ -108,24 +105,6 @@ public class JaninoCompilerTest {
         Assert.assertEquals(true, evaluate);
     }
 
-    @Test
-    public void testJaninoTimestampFunction() throws InvocationTargetException 
{
-        long epochTime = System.currentTimeMillis();
-        long localTime = epochTime + 
TimeZone.getTimeZone("GMT-8:00").getOffset(epochTime);
-        String expression = "currentTimestamp(epochTime, \"GMT-8:00\")";
-        List<String> columnNames = Arrays.asList("epochTime");
-        List<Class<?>> paramTypes = Arrays.asList(Long.class);
-        List<Object> params = Arrays.asList(epochTime);
-        ExpressionEvaluator expressionEvaluator =
-                JaninoCompiler.compileExpression(
-                        JaninoCompiler.loadSystemFunction(expression),
-                        columnNames,
-                        paramTypes,
-                        TimestampData.class);
-        Object evaluate = expressionEvaluator.evaluate(params.toArray());
-        Assert.assertEquals(TimestampData.fromMillis(localTime), evaluate);
-    }
-
     @Test
     public void testBuildInFunction() throws InvocationTargetException {
         String expression = "ceil(2.4)";
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index f81c4a92c..66a405a97 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -228,16 +228,16 @@ public class TransformParserTest {
         testFilterExpression(
                 "id = CURRENT_DATE", "valueEquals(id, 
currentDate(__epoch_time__, __time_zone__))");
         testFilterExpression(
-                "id = CURRENT_TIMESTAMP",
-                "valueEquals(id, currentTimestamp(__epoch_time__, 
__time_zone__))");
-        testFilterExpression("NOW()", "now(__epoch_time__, __time_zone__)");
+                "id = CURRENT_TIMESTAMP", "valueEquals(id, 
currentTimestamp(__epoch_time__))");
+        testFilterExpression("NOW()", "now(__epoch_time__)");
         testFilterExpression("YEAR(dt)", "year(dt)");
         testFilterExpression("QUARTER(dt)", "quarter(dt)");
         testFilterExpression("MONTH(dt)", "month(dt)");
         testFilterExpression("WEEK(dt)", "week(dt)");
         testFilterExpression("DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt, 
\"yyyy-MM-dd\")");
-        testFilterExpression("TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, 
\"yyyy-MM-dd\")");
-        testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt)");
+        testFilterExpression(
+                "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", 
__time_zone__)");
+        testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, 
__time_zone__)");
         testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", 
"timestampDiff(\"DAY\", dt1, dt2)");
         testFilterExpression("IF(a>b,a,b)", "a > b ? a : b");
         testFilterExpression("NULLIF(a,b)", "nullif(a, b)");
@@ -292,6 +292,10 @@ public class TransformParserTest {
         testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 
10, 0)");
         testFilterExpression("cast(null as char)", "castToString(null)");
         testFilterExpression("cast(null as varchar)", "castToString(null)");
+        testFilterExpression(
+                "cast(CURRENT_TIMESTAMP as TIMESTAMP)",
+                "castToTimestamp(currentTimestamp(__epoch_time__), 
__time_zone__)");
+        testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, 
__time_zone__)");
     }
 
     private void testFilterExpression(String expression, String 
expressionExpect) {

Reply via email to