This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7ca11359d [FLINK-35743][cdc-runtime] Correct the temporal function
semantics
7ca11359d is described below
commit 7ca11359d7969cd7488fc546dd04f1bbfb8d6674
Author: Wink <[email protected]>
AuthorDate: Wed Aug 7 00:49:22 2024 +0800
[FLINK-35743][cdc-runtime] Correct the temporal function semantics
This closes #3449.
Co-authored-by: wenmo <[email protected]>
Co-authored-by: yuxiqian <[email protected]>
---
.../flink/cdc/common/utils/DateTimeUtils.java | 19 ++
.../flink/FlinkPipelineComposerITCase.java | 1 +
.../flink/FlinkPipelineTransformITCase.java | 281 +++++++++++++++++++++
.../cdc/connectors/values/ValuesDatabase.java | 20 +-
.../cdc/pipeline/tests/TransformE2eITCase.java | 123 +++++++++
.../cdc/runtime/functions/SystemFunctionUtils.java | 118 ++++++---
.../flink/cdc/runtime/parser/JaninoCompiler.java | 81 +++++-
.../parser/metadata/TransformSqlOperatorTable.java | 6 +-
.../transform/TransformDataOperatorTest.java | 219 +++++++++++++---
.../cdc/runtime/parser/JaninoCompilerTest.java | 21 --
.../cdc/runtime/parser/TransformParserTest.java | 14 +-
11 files changed, 793 insertions(+), 110 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
index 9a009df62..b923107e2 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
@@ -92,6 +92,14 @@ public class DateTimeUtils {
return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(),
zdt.getDayOfMonth());
}
+ public static int parseDate(String dateStr, String fromFormat, String
timezone) {
+ long ts = internalParseTimestampMillis(dateStr, fromFormat,
TimeZone.getTimeZone(timezone));
+ ZoneId zoneId = ZoneId.of(timezone);
+ Instant instant = Instant.ofEpochMilli(ts);
+ ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, zoneId);
+ return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(),
zdt.getDayOfMonth());
+ }
+
private static long internalParseTimestampMillis(String dateStr, String
format, TimeZone tz) {
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
formatter.setTimeZone(tz);
@@ -119,4 +127,15 @@ public class DateTimeUtils {
int m = month + 12 * a - 3;
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 -
32045;
}
+
+ //
--------------------------------------------------------------------------------------------
+ // Format
+ //
--------------------------------------------------------------------------------------------
+
+ public static String formatTimestampMillis(long ts, String format,
TimeZone timeZone) {
+ SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
+ formatter.setTimeZone(timeZone);
+ Date dateTime = new Date(ts);
+ return formatter.format(dateTime);
+ }
}
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 26c9c9187..a8acf7989 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -68,6 +68,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Integration test for {@link FlinkPipelineComposer}. */
class FlinkPipelineComposerITCase {
+
private static final int MAX_PARALLELISM = 4;
// Always use parent-first classloader for CDC classes.
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
new file mode 100644
index 000000000..77f89a37a
--- /dev/null
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.definition.TransformDef;
+import org.apache.flink.cdc.connectors.values.ValuesDatabase;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for {@link FlinkPipelineComposer}. */
+class FlinkPipelineTransformITCase {
+
+ private static final int MAX_PARALLELISM = 4;
+
+ // Always use parent-first classloader for CDC classes.
+ // The reason is that ValuesDatabase uses static field for holding data,
we need to make sure
+ // the class is loaded by AppClassloader so that we can verify data in the
test case.
+ private static final org.apache.flink.configuration.Configuration
MINI_CLUSTER_CONFIG =
+ new org.apache.flink.configuration.Configuration();
+
+ static {
+ MINI_CLUSTER_CONFIG.set(
+ ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+ Collections.singletonList("org.apache.flink.cdc"));
+ }
+
+ /**
+ * Use {@link MiniClusterExtension} to reduce the overhead of restarting
the MiniCluster for
+ * every test case.
+ */
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(MAX_PARALLELISM)
+ .setConfiguration(MINI_CLUSTER_CONFIG)
+ .build());
+
+ private final PrintStream standardOut = System.out;
+ private final ByteArrayOutputStream outCaptor = new
ByteArrayOutputStream();
+
+ @BeforeEach
+ void init() {
+ // Take over STDOUT as we need to check the output of values sink
+ System.setOut(new PrintStream(outCaptor));
+ // Initialize in-memory database
+ ValuesDatabase.clear();
+ }
+
+ @AfterEach
+ void cleanup() {
+ System.setOut(standardOut);
+ }
+
+ @Test
+ void testTransformWithTemporalFunction() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId myTable1 = TableId.tableId("default_namespace",
"default_schema", "mytable1");
+ TableId myTable2 = TableId.tableId("default_namespace",
"default_schema", "mytable2");
+ Schema table1Schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build();
+ Schema table2Schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT())
+ .physicalColumn("name", DataTypes.VARCHAR(255))
+ .physicalColumn("age", DataTypes.TINYINT())
+ .physicalColumn("description", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ List<Event> events = new ArrayList<>();
+ BinaryRecordDataGenerator table1dataGenerator =
+ new BinaryRecordDataGenerator(
+ table1Schema.getColumnDataTypes().toArray(new
DataType[0]));
+ BinaryRecordDataGenerator table2dataGenerator =
+ new BinaryRecordDataGenerator(
+ table2Schema.getColumnDataTypes().toArray(new
DataType[0]));
+ events.add(new CreateTableEvent(myTable1, table1Schema));
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ table1dataGenerator.generate(
+ new Object[] {1,
BinaryStringData.fromString("Alice"), 18})));
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ table1dataGenerator.generate(
+ new Object[] {2,
BinaryStringData.fromString("Bob"), 20})));
+ events.add(
+ DataChangeEvent.updateEvent(
+ myTable1,
+ table1dataGenerator.generate(
+ new Object[] {2,
BinaryStringData.fromString("Bob"), 20}),
+ table1dataGenerator.generate(
+ new Object[] {2,
BinaryStringData.fromString("Bob"), 30})));
+ events.add(new CreateTableEvent(myTable2, table2Schema));
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable2,
+ table2dataGenerator.generate(
+ new Object[] {
+ 3L,
+ BinaryStringData.fromString("Carol"),
+ (byte) 15,
+ BinaryStringData.fromString("student")
+ })));
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable2,
+ table2dataGenerator.generate(
+ new Object[] {
+ 4L,
+ BinaryStringData.fromString("Derrida"),
+ (byte) 25,
+ BinaryStringData.fromString("student")
+ })));
+ events.add(
+ DataChangeEvent.deleteEvent(
+ myTable2,
+ table2dataGenerator.generate(
+ new Object[] {
+ 4L,
+ BinaryStringData.fromString("Derrida"),
+ (byte) 25,
+ BinaryStringData.fromString("student")
+ })));
+
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE,
"America/Los_Angeles");
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+
"default_namespace.default_schema.\\.*",
+ "*, LOCALTIME as lcl_t, CURRENT_TIME
as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS
TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt",
+ null,
+ null,
+ null,
+ null,
+ null)),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ Arrays.stream(outputEvents).forEach(this::extractDataLines);
+ }
+
+ void extractDataLines(String line) {
+ if (!line.startsWith("DataChangeEvent{")) {
+ return;
+ }
+ Stream.of("before", "after")
+ .forEach(
+ tag -> {
+ String[] arr = line.split(tag + "=\\[", 2);
+ String dataRecord = arr[arr.length - 1].split("]",
2)[0];
+ if (!dataRecord.isEmpty()) {
+ verifyDataRecord(dataRecord);
+ }
+ });
+ }
+
+ void verifyDataRecord(String recordLine) {
+ List<String> tokens = Arrays.asList(recordLine.split(", "));
+ assertThat(tokens).hasSizeGreaterThanOrEqualTo(6);
+
+ tokens = tokens.subList(tokens.size() - 6, tokens.size());
+
+ String localTime = tokens.get(0);
+ String currentTime = tokens.get(1);
+ assertThat(localTime).isEqualTo(currentTime);
+
+ String currentTimestamp = tokens.get(2);
+ String nowTimestamp = tokens.get(3);
+ String localTimestamp = tokens.get(4);
+
assertThat(currentTimestamp).isEqualTo(nowTimestamp).isEqualTo(localTimestamp);
+
+ Instant instant =
+ LocalDateTime.parse(
+ currentTimestamp,
+
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+ .toInstant(ZoneOffset.UTC);
+
+ long milliSecondsInOneDay = 24 * 60 * 60 * 1000;
+
+ assertThat(instant.toEpochMilli() % milliSecondsInOneDay)
+ .isEqualTo(Long.parseLong(localTime));
+
+ String currentDate = tokens.get(5);
+
+ assertThat(instant.toEpochMilli() / milliSecondsInOneDay)
+ .isEqualTo(Long.parseLong(currentDate));
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
index 36edab6d8..4d80deece 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
@@ -32,6 +32,7 @@ import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.source.MetadataAccessor;
import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSource;
@@ -48,6 +49,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -185,7 +187,7 @@ public class ValuesDatabase {
private final TableId tableId;
// [primaryKeys, [column_name, column_value]]
- private final Map<String, Map<String, String>> records;
+ private final Map<String, Map<String, RecordData>> records;
private final LinkedList<Column> columns;
@@ -210,15 +212,21 @@ public class ValuesDatabase {
public List<String> getResult() {
List<String> results = new ArrayList<>();
synchronized (lock) {
+ List<RecordData.FieldGetter> fieldGetters =
SchemaUtils.createFieldGetters(columns);
records.forEach(
(key, record) -> {
StringBuilder stringBuilder = new
StringBuilder(tableId.toString());
stringBuilder.append(":");
- for (Column column : columns) {
+ for (int i = 0; i < columns.size(); i++) {
+ Column column = columns.get(i);
+ RecordData.FieldGetter fieldGetter =
fieldGetters.get(i);
stringBuilder
.append(column.getName())
.append("=")
-
.append(record.getOrDefault(column.getName(), ""))
+ .append(
+
Optional.ofNullable(record.get(column.getName()))
+
.map(fieldGetter::getFieldOrNull)
+ .orElse(""))
.append(";");
}
stringBuilder.deleteCharAt(stringBuilder.length()
- 1);
@@ -257,9 +265,9 @@ public class ValuesDatabase {
private void insert(RecordData recordData) {
String primaryKey = buildPrimaryKeyStr(recordData);
- Map<String, String> record = new HashMap<>();
+ Map<String, RecordData> record = new HashMap<>();
for (int i = 0; i < recordData.getArity(); i++) {
- record.put(columns.get(i).getName(),
recordData.getString(i).toString());
+ record.put(columns.get(i).getName(), recordData);
}
records.put(primaryKey, record);
}
@@ -393,7 +401,7 @@ public class ValuesDatabase {
records.forEach(
(key, record) -> {
if
(record.containsKey(beforeName)) {
- String value =
record.get(beforeName);
+ RecordData value =
record.get(beforeName);
record.remove(beforeName);
record.put(afterName, value);
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 2f896d334..3f0999ee1 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -25,6 +25,7 @@ import
org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import
org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -40,9 +41,14 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
/** E2e tests for the {@link TransformSchemaOperator}. */
@RunWith(Parameterized.class)
@@ -315,6 +321,44 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
System.out.println(stdout);
}
+ @Test
+ public void testTemporalFunctions() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "transform:\n"
+ + " - source-table: %s.\\.*\n"
+ + " projection: ID, LOCALTIME as lcl_t,
CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts,
CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as
cur_dt\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: 1\n"
+ + " local-time-zone: America/Los_Angeles",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ transformRenameDatabase.getDatabaseName(),
+ transformRenameDatabase.getDatabaseName());
+ Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar,
mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+
+ waitForTemporaryRecords(8, 60000L);
+ }
+
private void validateResult(List<String> expectedEvents) throws Exception {
for (String event : expectedEvents) {
waitUntilSpecificEvent(event, 6000L);
@@ -340,4 +384,83 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ taskManagerConsumer.toUtf8String());
}
}
+
+ private int validateTemporaryRecords() {
+ int validRecordCount = 0;
+ for (String line : taskManagerConsumer.toUtf8String().split("\n")) {
+ if (extractDataLines(line)) {
+ validRecordCount++;
+ }
+ }
+ return validRecordCount;
+ }
+
+ private void waitForTemporaryRecords(int expectedRecords, long timeout)
throws Exception {
+ boolean result = false;
+ long endTimeout = System.currentTimeMillis() + timeout;
+ while (System.currentTimeMillis() < endTimeout) {
+ if (validateTemporaryRecords() >= expectedRecords) {
+ result = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ if (!result) {
+ throw new TimeoutException(
+ "failed to get enough temporary records: "
+ + expectedRecords
+ + " from stdout: "
+ + taskManagerConsumer.toUtf8String());
+ }
+ }
+
+ boolean extractDataLines(String line) {
+ if (!line.startsWith("DataChangeEvent{")) {
+ return false;
+ }
+ Stream.of("before", "after")
+ .forEach(
+ tag -> {
+ String[] arr = line.split(tag + "=\\[", 2);
+ String dataRecord = arr[arr.length - 1].split("]",
2)[0];
+ if (!dataRecord.isEmpty()) {
+ verifyDataRecord(dataRecord);
+ }
+ });
+ return true;
+ }
+
+ void verifyDataRecord(String recordLine) {
+ LOG.info("Verifying data line {}", recordLine);
+ List<String> tokens = Arrays.asList(recordLine.split(", "));
+ Assert.assertTrue(tokens.size() >= 6);
+
+ tokens = tokens.subList(tokens.size() - 6, tokens.size());
+
+ String localTime = tokens.get(0);
+ String currentTime = tokens.get(1);
+ Assert.assertEquals(localTime, currentTime);
+
+ String currentTimestamp = tokens.get(2);
+ String nowTimestamp = tokens.get(3);
+ String localTimestamp = tokens.get(4);
+ Assert.assertEquals(currentTimestamp, nowTimestamp);
+ Assert.assertEquals(currentTimestamp, localTimestamp);
+
+ Instant instant =
+ LocalDateTime.parse(
+ currentTimestamp,
+
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+ .toInstant(ZoneOffset.UTC);
+
+ long milliSecondsInOneDay = 24 * 60 * 60 * 1000;
+
+ Assert.assertEquals(
+ instant.toEpochMilli() % milliSecondsInOneDay,
Long.parseLong(localTime));
+
+ String currentDate = tokens.get(5);
+
+ Assert.assertEquals(
+ instant.toEpochMilli() / milliSecondsInOneDay,
Long.parseLong(currentDate));
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 34c299567..ba569fc08 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -30,6 +30,9 @@ import java.math.MathContext;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
@@ -38,65 +41,62 @@ import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static
org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToDate;
+import static
org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToTime;
+
/** System function utils to support the call of flink cdc pipeline transform.
*/
public class SystemFunctionUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(SystemFunctionUtils.class);
- public static int localtime(long epochTime, String timezone) {
- return DateTimeUtils.timestampMillisToTime(epochTime);
- }
+ private static final Logger LOG =
LoggerFactory.getLogger(SystemFunctionUtils.class);
- public static TimestampData localtimestamp(long epochTime, String
timezone) {
- return TimestampData.fromMillis(epochTime);
+ public static LocalZonedTimestampData currentTimestamp(long epochTime) {
+ return LocalZonedTimestampData.fromEpochMillis(epochTime);
}
- // synonym: localtime
- public static int currentTime(long epochTime, String timezone) {
- return localtime(epochTime, timezone);
+ // synonym with currentTimestamp
+ public static LocalZonedTimestampData now(long epochTime) {
+ return LocalZonedTimestampData.fromEpochMillis(epochTime);
}
- public static int currentDate(long epochTime, String timezone) {
- return DateTimeUtils.timestampMillisToDate(epochTime);
+ public static TimestampData localtimestamp(long epochTime, String
timezone) {
+ return TimestampData.fromLocalDateTime(
+
Instant.ofEpochMilli(epochTime).atZone(ZoneId.of(timezone)).toLocalDateTime());
}
- public static TimestampData currentTimestamp(long epochTime, String
timezone) {
- return TimestampData.fromMillis(
- epochTime +
TimeZone.getTimeZone(timezone).getOffset(epochTime));
+ public static int localtime(long epochTime, String timezone) {
+ return timestampMillisToTime(localtimestamp(epochTime,
timezone).getMillisecond());
}
- public static LocalZonedTimestampData now(long epochTime, String timezone)
{
- return LocalZonedTimestampData.fromEpochMillis(epochTime);
+ public static int currentTime(long epochTime, String timezone) {
+ // the time value of currentTimestamp under given session time zone
+ return timestampMillisToTime(localtimestamp(epochTime,
timezone).getMillisecond());
}
- public static String dateFormat(LocalZonedTimestampData timestamp, String
format) {
- SimpleDateFormat dateFormat = new SimpleDateFormat(format);
- return dateFormat.format(new Date(timestamp.getEpochMillisecond()));
+ public static int currentDate(long epochTime, String timezone) {
+ // the date value of currentTimestamp under given session time zone
+ return timestampMillisToDate(localtimestamp(epochTime,
timezone).getMillisecond());
}
public static String dateFormat(TimestampData timestamp, String format) {
- SimpleDateFormat dateFormat = new SimpleDateFormat(format);
- return dateFormat.format(new Date(timestamp.getMillisecond()));
- }
-
- public static String dateFormat(ZonedTimestampData timestamp, String
format) {
- SimpleDateFormat dateFormat = new SimpleDateFormat(format);
- return dateFormat.format(new Date(timestamp.getMillisecond()));
+ return DateTimeUtils.formatTimestampMillis(
+ timestamp.getMillisecond(), format,
TimeZone.getTimeZone("UTC"));
}
- public static int toDate(String str) {
- return toDate(str, "yyyy-MM-dd");
+ public static int toDate(String str, String timezone) {
+ return toDate(str, "yyyy-MM-dd", timezone);
}
- public static int toDate(String str, String format) {
- return DateTimeUtils.parseDate(str, format);
+ public static int toDate(String str, String format, String timezone) {
+ return DateTimeUtils.parseDate(str, format, timezone);
}
- public static TimestampData toTimestamp(String str) {
- return toTimestamp(str, "yyyy-MM-dd HH:mm:ss");
+ public static TimestampData toTimestamp(String str, String timezone) {
+ return toTimestamp(str, "yyyy-MM-dd HH:mm:ss", timezone);
}
- public static TimestampData toTimestamp(String str, String format) {
+ public static TimestampData toTimestamp(String str, String format, String
timezone) {
SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timezone));
try {
return TimestampData.fromMillis(dateFormat.parse(str).getTime());
} catch (ParseException e) {
@@ -118,11 +118,45 @@ public class SystemFunctionUtils {
return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
}
+ public static int timestampDiff(
+ String symbol, TimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(
+ symbol, fromTimestamp.getMillisecond(),
toTimestamp.getEpochMillisecond());
+ }
+
+ public static int timestampDiff(
+ String symbol, LocalZonedTimestampData fromTimestamp,
TimestampData toTimestamp) {
+ return timestampDiff(
+ symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getMillisecond());
+ }
+
public static int timestampDiff(
String symbol, ZonedTimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
}
+ public static int timestampDiff(
+ String symbol, LocalZonedTimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
+ return timestampDiff(
+ symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getMillisecond());
+ }
+
+ public static int timestampDiff(
+ String symbol, ZonedTimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(
+ symbol, fromTimestamp.getMillisecond(),
toTimestamp.getEpochMillisecond());
+ }
+
+ public static int timestampDiff(
+ String symbol, TimestampData fromTimestamp, ZonedTimestampData
toTimestamp) {
+ return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ }
+
+ public static int timestampDiff(
+ String symbol, ZonedTimestampData fromTimestamp, TimestampData
toTimestamp) {
+ return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ }
+
public static int timestampDiff(String symbol, long fromDate, long toDate)
{
Calendar from = Calendar.getInstance();
from.setTime(new Date(fromDate));
@@ -587,6 +621,24 @@ public class SystemFunctionUtils {
return bigDecimal;
}
+ public static TimestampData castToTimestamp(Object object, String
timezone) {
+ if (object == null) {
+ return null;
+ }
+ if (object instanceof LocalZonedTimestampData) {
+ return TimestampData.fromLocalDateTime(
+ LocalDateTime.ofInstant(
+ ((LocalZonedTimestampData) object).toInstant(),
ZoneId.of(timezone)));
+ } else if (object instanceof ZonedTimestampData) {
+ return TimestampData.fromLocalDateTime(
+ LocalDateTime.ofInstant(
+ ((ZonedTimestampData) object).toInstant(),
ZoneId.of(timezone)));
+ } else {
+ return TimestampData.fromLocalDateTime(
+ LocalDateTime.parse(castObjectIntoString(object)));
+ }
+ }
+
private static String castObjectIntoString(Object object) {
if (object instanceof Boolean) {
return Boolean.valueOf(castToString(object)) ? "1" : "0";
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 2dd1b8402..4877d0175 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -49,13 +49,18 @@ import java.util.List;
public class JaninoCompiler {
private static final List<SqlTypeName> SQL_TYPE_NAME_IGNORE =
Arrays.asList(SqlTypeName.SYMBOL);
- private static final List<String> NO_OPERAND_TIMESTAMP_FUNCTIONS =
- Arrays.asList(
- "LOCALTIME",
- "LOCALTIMESTAMP",
- "CURRENT_TIME",
- "CURRENT_DATE",
- "CURRENT_TIMESTAMP");
+ private static final List<String> TIMEZONE_FREE_TEMPORAL_FUNCTIONS =
+ Arrays.asList("CURRENT_TIMESTAMP", "NOW");
+
+ private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
+ Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME",
"CURRENT_DATE");
+
+ private static final List<String>
TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
+ Arrays.asList("DATE_FORMAT");
+
+ private static final List<String>
TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
+ Arrays.asList("TO_DATE", "TO_TIMESTAMP");
+
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
@@ -107,8 +112,14 @@ public class JaninoCompiler {
private static Java.Rvalue translateSqlIdentifier(SqlIdentifier
sqlIdentifier) {
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size()
- 1);
- if (NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(columnName)) {
- return generateNoOperandTimestampFunctionOperation(columnName);
+ if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName)) {
+ return generateTimezoneFreeTemporalFunctionOperation(columnName);
+ } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName)) {
+ return
generateTimezoneRequiredTemporalFunctionOperation(columnName);
+ } else if
(TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
+ return
generateTimezoneFreeTemporalConversionFunctionOperation(columnName);
+ } else if
(TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) {
+ return
generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
} else {
return new Java.AmbiguousName(Location.NOWHERE, new String[]
{columnName});
}
@@ -135,8 +146,14 @@ public class JaninoCompiler {
for (SqlNode sqlNode : operandList) {
translateSqlNodeToAtoms(sqlNode, atoms);
}
- if
(NO_OPERAND_TIMESTAMP_FUNCTIONS.contains(sqlBasicCall.getOperator().getName()))
{
+ if
(TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(sqlBasicCall.getOperator().getName()))
{
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_EPOCH_TIME}));
+ } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(
+ sqlBasicCall.getOperator().getName())) {
+ atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_EPOCH_TIME}));
+ atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_TIME_ZONE}));
+ } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
+ sqlBasicCall.getOperator().getName())) {
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_TIME_ZONE}));
}
return sqlBasicCallToJaninoRvalue(sqlBasicCall, atoms.toArray(new
Java.Rvalue[0]));
@@ -289,8 +306,6 @@ public class JaninoCompiler {
} else {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
- } else if (operationName.equals("NOW")) {
- return generateNoOperandTimestampFunctionOperation(operationName);
} else {
return new Java.MethodInvocation(
Location.NOWHERE,
@@ -300,7 +315,18 @@ public class JaninoCompiler {
}
}
- private static Java.Rvalue
generateNoOperandTimestampFunctionOperation(String operationName) {
+ private static Java.Rvalue
generateTimezoneFreeTemporalFunctionOperation(String operationName) {
+ return new Java.MethodInvocation(
+ Location.NOWHERE,
+ null,
+ StringUtils.convertToCamelCase(operationName),
+ new Java.Rvalue[] {
+ new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_EPOCH_TIME})
+ });
+ }
+
+ private static Java.Rvalue
generateTimezoneRequiredTemporalFunctionOperation(
+ String operationName) {
List<Java.Rvalue> timestampFunctionParam = new ArrayList<>();
timestampFunctionParam.add(
new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_EPOCH_TIME}));
@@ -313,6 +339,26 @@ public class JaninoCompiler {
timestampFunctionParam.toArray(new Java.Rvalue[0]));
}
+ private static Java.Rvalue
generateTimezoneFreeTemporalConversionFunctionOperation(
+ String operationName) {
+ return new Java.MethodInvocation(
+ Location.NOWHERE,
+ null,
+ StringUtils.convertToCamelCase(operationName),
+ new Java.Rvalue[0]);
+ }
+
+ private static Java.Rvalue
generateTimezoneRequiredTemporalConversionFunctionOperation(
+ String operationName) {
+ return new Java.MethodInvocation(
+ Location.NOWHERE,
+ null,
+ StringUtils.convertToCamelCase(operationName),
+ new Java.Rvalue[] {
+ new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_TIME_ZONE})
+ });
+ }
+
private static Java.Rvalue generateTypeConvertMethod(
SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
@@ -359,6 +405,15 @@ public class JaninoCompiler {
case "VARCHAR":
case "STRING":
return new Java.MethodInvocation(Location.NOWHERE, null,
"castToString", atoms);
+ case "TIMESTAMP":
+ List<Java.Rvalue> timestampAtoms = new
ArrayList<>(Arrays.asList(atoms));
+ timestampAtoms.add(
+ new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_TIME_ZONE}));
+ return new Java.MethodInvocation(
+ Location.NOWHERE,
+ null,
+ "castToTimestamp",
+ timestampAtoms.toArray(new Java.Rvalue[0]));
default:
throw new ParseException(
"Unsupported data type cast: " +
sqlDataTypeSpec.toString());
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 8ecefa8f4..35253f279 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlCurrentDateFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
@@ -86,11 +87,12 @@ public class TransformSqlOperatorTable extends
ReflectiveSqlOperatorTable {
.build();
public static final SqlFunction LOCALTIMESTAMP =
new BuiltInTimestampFunction("LOCALTIMESTAMP",
SqlTypeName.TIMESTAMP, 3);
+ public static final SqlFunction CURRENT_TIME =
+ new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0);
public static final SqlFunction CURRENT_TIMESTAMP =
new BuiltInTimestampFunction(
"CURRENT_TIMESTAMP",
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
- public static final SqlFunction CURRENT_DATE =
- new BuiltInTimestampFunction("CURRENT_DATE", SqlTypeName.DATE, 0);
+ public static final SqlFunction CURRENT_DATE = new
SqlCurrentDateFunction();
public static final SqlFunction NOW =
new BuiltInTimestampFunction("NOW",
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
@Override
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
index 2213127a4..a19722ef1 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
@@ -110,8 +110,6 @@ public class TransformDataOperatorTest {
.physicalColumn("minute_diff", DataTypes.INT())
.physicalColumn("hour_diff", DataTypes.INT())
.physicalColumn("day_diff", DataTypes.INT())
- .physicalColumn("month_diff", DataTypes.INT())
- .physicalColumn("year_diff", DataTypes.INT())
.primaryKey("col1")
.build();
@@ -131,6 +129,7 @@ public class TransformDataOperatorTest {
.physicalColumn("nullChar", DataTypes.CHAR(1))
.physicalColumn("nullVarchar", DataTypes.VARCHAR(1))
.physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2))
+ .physicalColumn("nullTimestamp", DataTypes.TIMESTAMP(3))
.primaryKey("col1")
.build();
@@ -149,6 +148,16 @@ public class TransformDataOperatorTest {
.physicalColumn("castChar", DataTypes.CHAR(1))
.physicalColumn("castVarchar", DataTypes.VARCHAR(1))
.physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2))
+ .physicalColumn("castTimestamp", DataTypes.TIMESTAMP(3))
+ .primaryKey("col1")
+ .build();
+
+ private static final TableId TIMEZONE_TABLEID =
+ TableId.tableId("my_company", "my_branch", "timezone_table");
+ private static final Schema TIMEZONE_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("datetime", DataTypes.STRING())
.primaryKey("col1")
.build();
@@ -502,10 +511,10 @@ public class TransformDataOperatorTest {
.addTransform(
TIMESTAMP_TABLEID.identifier(),
"col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as
time_equal,"
- + " IF(LOCALTIMESTAMP =
CURRENT_TIMESTAMP, 1, 0) as timestamp_equal,"
+ + "
IF(DATE_FORMAT(CAST(CURRENT_TIMESTAMP AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss') =
DATE_FORMAT(CAST(NOW() AS TIMESTAMP), 'yyyy-MM-dd HH:mm:ss'), 1, 0) as
timestamp_equal,"
+ "
IF(TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) = CURRENT_DATE, 1, 0) as
date_equal",
- "LOCALTIMESTAMP = CURRENT_TIMESTAMP")
- .addTimezone("GMT")
+ "LOCALTIMESTAMP = CAST(CURRENT_TIMESTAMP AS
TIMESTAMP)")
+ .addTimezone("UTC")
.build();
EventOperatorTestHarness<TransformDataOperator, Event>
transformFunctionEventEventOperatorTestHarness =
@@ -546,14 +555,19 @@ public class TransformDataOperatorTest {
TransformDataOperator.newBuilder()
.addTransform(
TIMESTAMPDIFF_TABLEID.identifier(),
- "col1, TIMESTAMP_DIFF('SECOND',
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as second_diff,"
- + " TIMESTAMP_DIFF('MINUTE',
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as minute_diff,"
- + " TIMESTAMP_DIFF('HOUR',
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as hour_diff,"
- + " TIMESTAMP_DIFF('DAY',
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as day_diff,"
- + " TIMESTAMP_DIFF('MONTH',
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as month_diff,"
- + " TIMESTAMP_DIFF('YEAR',
LOCALTIMESTAMP, CURRENT_TIMESTAMP) as year_diff",
- null)
- .addTimezone("GMT-8:00")
+ "col1, TIMESTAMP_DIFF('SECOND',
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff,"
+ + " TIMESTAMP_DIFF('MINUTE',
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff,"
+ + " TIMESTAMP_DIFF('HOUR',
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff,"
+ + " TIMESTAMP_DIFF('DAY',
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff",
+ "col1='1'")
+ .addTransform(
+ TIMESTAMPDIFF_TABLEID.identifier(),
+ "col1, TIMESTAMP_DIFF('SECOND',
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as second_diff,"
+ + " TIMESTAMP_DIFF('MINUTE',
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff,"
+ + " TIMESTAMP_DIFF('HOUR',
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff,"
+ + " TIMESTAMP_DIFF('DAY',
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as day_diff",
+ "col1='2'")
+ .addTimezone("Asia/Shanghai")
.build();
EventOperatorTestHarness<TransformDataOperator, Event>
transformFunctionEventEventOperatorTestHarness =
@@ -570,22 +584,79 @@ public class TransformDataOperatorTest {
DataChangeEvent.insertEvent(
TIMESTAMPDIFF_TABLEID,
recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"), null, null,
null, null, null, null
- }));
+ new Object[] {new BinaryStringData("1"), null,
null, null, null}));
DataChangeEvent insertEventExpect =
DataChangeEvent.insertEvent(
TIMESTAMPDIFF_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"), 0, 0,
0, 0}));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(TIMESTAMPDIFF_TABLEID,
TIMESTAMPDIFF_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ TIMESTAMPDIFF_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), null,
null, null, null}));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ TIMESTAMPDIFF_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), 0, 0,
0, 0}));
+
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+ }
+
+ @Test
+ void testTimezoneTransform() throws Exception {
+ TransformDataOperator transform =
+ TransformDataOperator.newBuilder()
+ .addTransform(
+ TIMEZONE_TABLEID.identifier(),
+ "col1, DATE_FORMAT(TO_TIMESTAMP('2024-08-01
00:00:00'), 'yyyy-MM-dd HH:mm:ss') as datetime",
+ null)
+ .addTimezone("UTC")
+ .build();
+ EventOperatorTestHarness<TransformDataOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent = new
CreateTableEvent(TIMEZONE_TABLEID, TIMEZONE_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
TIMEZONE_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ TIMEZONE_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"),
null}));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ TIMEZONE_TABLEID,
recordDataGenerator.generate(
new Object[] {
- new BinaryStringData("1"), -28800, -480,
-8, 0, 0, 0
+ new BinaryStringData("1"),
+ new BinaryStringData("2024-08-01 00:00:00")
}));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
- new CreateTableEvent(TIMESTAMPDIFF_TABLEID,
TIMESTAMPDIFF_SCHEMA)));
+ new CreateTableEvent(TIMEZONE_TABLEID,
TIMEZONE_SCHEMA)));
transform.processElement(new StreamRecord<>(insertEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
@@ -609,7 +680,8 @@ public class TransformDataOperatorTest {
+ ",cast(colString as double) as
nullDouble"
+ ",cast(colString as char) as
nullChar"
+ ",cast(colString as varchar) as
nullVarchar"
- + ",cast(colString as DECIMAL(4,2)) as
nullDecimal",
+ + ",cast(colString as DECIMAL(4,2)) as
nullDecimal"
+ + ",cast(colString as TIMESTAMP(3)) as
nullTimestamp",
null)
.build();
EventOperatorTestHarness<TransformDataOperator, Event>
@@ -639,6 +711,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
@@ -666,7 +739,8 @@ public class TransformDataOperatorTest {
+ ",cast(col1 as double) as castDouble"
+ ",cast(col1 as char) as castChar"
+ ",cast(col1 as varchar) as
castVarchar"
- + ",cast(col1 as DECIMAL(4,2)) as
castDecimal",
+ + ",cast(col1 as DECIMAL(4,2)) as
castDecimal"
+ + ", castTimestamp",
"col1 = '1'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -680,7 +754,8 @@ public class TransformDataOperatorTest {
+ ",cast(castInt as double) as
castDouble"
+ ",cast(castInt as char) as castChar"
+ ",cast(castInt as varchar) as
castVarchar"
- + ",cast(castInt as DECIMAL(4,2)) as
castDecimal",
+ + ",cast(castInt as DECIMAL(4,2)) as
castDecimal"
+ + ", castTimestamp",
"col1 = '2'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -694,7 +769,8 @@ public class TransformDataOperatorTest {
+ ",cast(castBoolean as double) as
castDouble"
+ ",cast(castBoolean as char) as
castChar"
+ ",cast(castBoolean as varchar) as
castVarchar"
- + ",cast(castBoolean as DECIMAL(4,2))
as castDecimal",
+ + ",cast(castBoolean as DECIMAL(4,2))
as castDecimal"
+ + ", castTimestamp",
"col1 = '3'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -708,7 +784,8 @@ public class TransformDataOperatorTest {
+ ",cast(castTinyint as double) as
castDouble"
+ ",cast(castTinyint as char) as
castChar"
+ ",cast(castTinyint as varchar) as
castVarchar"
- + ",cast(castTinyint as DECIMAL(4,2))
as castDecimal",
+ + ",cast(castTinyint as DECIMAL(4,2))
as castDecimal"
+ + ", castTimestamp",
"col1 = '4'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -722,7 +799,8 @@ public class TransformDataOperatorTest {
+ ",cast(castSmallint as double) as
castDouble"
+ ",cast(castSmallint as char) as
castChar"
+ ",cast(castSmallint as varchar) as
castVarchar"
- + ",cast(castSmallint as DECIMAL(4,2))
as castDecimal",
+ + ",cast(castSmallint as DECIMAL(4,2))
as castDecimal"
+ + ", castTimestamp",
"col1 = '5'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -736,7 +814,8 @@ public class TransformDataOperatorTest {
+ ",cast(castBigint as double) as
castDouble"
+ ",cast(castBigint as char) as
castChar"
+ ",cast(castBigint as varchar) as
castVarchar"
- + ",cast(castBigint as DECIMAL(4,2))
as castDecimal",
+ + ",cast(castBigint as DECIMAL(4,2))
as castDecimal"
+ + ", castTimestamp",
"col1 = '6'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -750,7 +829,8 @@ public class TransformDataOperatorTest {
+ ",cast(castFloat as double) as
castDouble"
+ ",cast(castFloat as char) as
castChar"
+ ",cast(castFloat as varchar) as
castVarchar"
- + ",cast(castFloat as DECIMAL(4,2)) as
castDecimal",
+ + ",cast(castFloat as DECIMAL(4,2)) as
castDecimal"
+ + ", castTimestamp",
"col1 = '7'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -764,7 +844,8 @@ public class TransformDataOperatorTest {
+ ",cast(castDouble as double) as
castDouble"
+ ",cast(castDouble as char) as
castChar"
+ ",cast(castDouble as varchar) as
castVarchar"
- + ",cast(castDouble as DECIMAL(4,2))
as castDecimal",
+ + ",cast(castDouble as DECIMAL(4,2))
as castDecimal"
+ + ", castTimestamp",
"col1 = '8'")
.addTransform(
CAST_TABLEID.identifier(),
@@ -778,8 +859,24 @@ public class TransformDataOperatorTest {
+ ",cast(castDecimal as double) as
castDouble"
+ ",cast(castDecimal as char) as
castChar"
+ ",cast(castDecimal as varchar) as
castVarchar"
- + ",cast(castDecimal as DECIMAL(4,2))
as castDecimal",
+ + ",cast(castDecimal as DECIMAL(4,2))
as castDecimal"
+ + ", castTimestamp",
"col1 = '9'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",castInt"
+ + ",castBoolean"
+ + ",castTinyint"
+ + ",castSmallint"
+ + ",castBigint"
+ + ",castFloat"
+ + ",castDouble"
+ + ",castChar"
+ + ",cast(castTimestamp as varchar) as
castVarchar"
+ + ",castDecimal"
+ + ",cast('1970-01-01T00:00:01.234' as
TIMESTAMP(3)) as castTimestamp",
+ "col1 = '10'")
.build();
EventOperatorTestHarness<TransformDataOperator, Event>
transformFunctionEventEventOperatorTestHarness =
@@ -807,6 +904,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect1 =
DataChangeEvent.insertEvent(
@@ -824,6 +922,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("1"),
new BinaryStringData("1"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
@@ -849,6 +948,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect2 =
DataChangeEvent.insertEvent(
@@ -866,6 +966,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("1"),
new BinaryStringData("1"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent2));
Assertions.assertThat(
@@ -887,6 +988,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect3 =
DataChangeEvent.insertEvent(
@@ -904,6 +1006,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("true"),
new BinaryStringData("true"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent3));
Assertions.assertThat(
@@ -925,6 +1028,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect4 =
DataChangeEvent.insertEvent(
@@ -942,6 +1046,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("1"),
new BinaryStringData("1"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent4));
Assertions.assertThat(
@@ -963,6 +1068,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect5 =
DataChangeEvent.insertEvent(
@@ -980,6 +1086,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("1"),
new BinaryStringData("1"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent5));
Assertions.assertThat(
@@ -1001,6 +1108,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect6 =
DataChangeEvent.insertEvent(
@@ -1018,6 +1126,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("1"),
new BinaryStringData("1"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent6));
Assertions.assertThat(
@@ -1039,6 +1148,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect7 =
DataChangeEvent.insertEvent(
@@ -1056,6 +1166,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("1.0"),
new BinaryStringData("1.0"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent7));
Assertions.assertThat(
@@ -1077,6 +1188,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
DataChangeEvent insertEventExpect8 =
DataChangeEvent.insertEvent(
@@ -1094,6 +1206,7 @@ public class TransformDataOperatorTest {
new BinaryStringData("1.0"),
new BinaryStringData("1.0"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent8));
Assertions.assertThat(
@@ -1115,6 +1228,7 @@ public class TransformDataOperatorTest {
null,
null,
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
DataChangeEvent insertEventExpect9 =
DataChangeEvent.insertEvent(
@@ -1132,11 +1246,53 @@ public class TransformDataOperatorTest {
new BinaryStringData("1.00"),
new BinaryStringData("1.00"),
DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ null
}));
transform.processElement(new StreamRecord<>(insertEvent9));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect9));
+
+ DataChangeEvent insertEvent10 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("10"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ TimestampData.fromMillis(1234, 0)
+ }));
+ DataChangeEvent insertEventExpect10 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("10"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new
BinaryStringData("1970-01-01T00:00:01.234"),
+ null,
+ TimestampData.fromMillis(1234, 0)
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent10));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect10));
}
@Test
@@ -1155,7 +1311,8 @@ public class TransformDataOperatorTest {
+ ",cast(castFloat as double) as
castDouble"
+ ",cast(castFloat as char) as
castChar"
+ ",cast(castFloat as varchar) as
castVarchar"
- + ",cast(castFloat as DECIMAL(4,2)) as
castDecimal",
+ + ",cast(castFloat as DECIMAL(4,2)) as
castDecimal"
+ + ",cast(castFloat as TIMESTAMP) as
castTimestamp",
"col1 = '1'")
.build();
EventOperatorTestHarness<TransformDataOperator, Event>
@@ -1184,6 +1341,7 @@ public class TransformDataOperatorTest {
null,
null,
null,
+ null
}));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
@@ -1255,6 +1413,7 @@ public class TransformDataOperatorTest {
testExpressionConditionTransform("cast(null as char) is null");
testExpressionConditionTransform("cast(null as varchar) is null");
testExpressionConditionTransform("cast(null as DECIMAL(4,2)) is null");
+ testExpressionConditionTransform("cast(null as TIMESTAMP(3)) is null");
}
private void testExpressionConditionTransform(String expression) throws
Exception {
@@ -1264,7 +1423,7 @@ public class TransformDataOperatorTest {
CONDITION_TABLEID.identifier(),
"col1, IF(" + expression + ", true, false) as
condition_result",
expression)
- .addTimezone("GMT")
+ .addTimezone("UTC")
.build();
EventOperatorTestHarness<TransformDataOperator, Event>
transformFunctionEventEventOperatorTestHarness =
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
index db86783d2..6fbaef3bf 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
@@ -17,8 +17,6 @@
package org.apache.flink.cdc.runtime.parser;
-import org.apache.flink.cdc.common.data.TimestampData;
-
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.commons.compiler.Location;
import org.codehaus.janino.ExpressionEvaluator;
@@ -36,7 +34,6 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.TimeZone;
/** Unit tests for the {@link JaninoCompiler}. */
public class JaninoCompilerTest {
@@ -108,24 +105,6 @@ public class JaninoCompilerTest {
Assert.assertEquals(true, evaluate);
}
- @Test
- public void testJaninoTimestampFunction() throws InvocationTargetException
{
- long epochTime = System.currentTimeMillis();
- long localTime = epochTime +
TimeZone.getTimeZone("GMT-8:00").getOffset(epochTime);
- String expression = "currentTimestamp(epochTime, \"GMT-8:00\")";
- List<String> columnNames = Arrays.asList("epochTime");
- List<Class<?>> paramTypes = Arrays.asList(Long.class);
- List<Object> params = Arrays.asList(epochTime);
- ExpressionEvaluator expressionEvaluator =
- JaninoCompiler.compileExpression(
- JaninoCompiler.loadSystemFunction(expression),
- columnNames,
- paramTypes,
- TimestampData.class);
- Object evaluate = expressionEvaluator.evaluate(params.toArray());
- Assert.assertEquals(TimestampData.fromMillis(localTime), evaluate);
- }
-
@Test
public void testBuildInFunction() throws InvocationTargetException {
String expression = "ceil(2.4)";
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index f81c4a92c..66a405a97 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -228,16 +228,16 @@ public class TransformParserTest {
testFilterExpression(
"id = CURRENT_DATE", "valueEquals(id,
currentDate(__epoch_time__, __time_zone__))");
testFilterExpression(
- "id = CURRENT_TIMESTAMP",
- "valueEquals(id, currentTimestamp(__epoch_time__,
__time_zone__))");
- testFilterExpression("NOW()", "now(__epoch_time__, __time_zone__)");
+ "id = CURRENT_TIMESTAMP", "valueEquals(id,
currentTimestamp(__epoch_time__))");
+ testFilterExpression("NOW()", "now(__epoch_time__)");
testFilterExpression("YEAR(dt)", "year(dt)");
testFilterExpression("QUARTER(dt)", "quarter(dt)");
testFilterExpression("MONTH(dt)", "month(dt)");
testFilterExpression("WEEK(dt)", "week(dt)");
testFilterExpression("DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt,
\"yyyy-MM-dd\")");
- testFilterExpression("TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt,
\"yyyy-MM-dd\")");
- testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt)");
+ testFilterExpression(
+ "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\",
__time_zone__)");
+ testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt,
__time_zone__)");
testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)",
"timestampDiff(\"DAY\", dt1, dt2)");
testFilterExpression("IF(a>b,a,b)", "a > b ? a : b");
testFilterExpression("NULLIF(a,b)", "nullif(a, b)");
@@ -292,6 +292,10 @@ public class TransformParserTest {
testFilterExpression("cast(null as decimal)", "castToBigDecimal(null,
10, 0)");
testFilterExpression("cast(null as char)", "castToString(null)");
testFilterExpression("cast(null as varchar)", "castToString(null)");
+ testFilterExpression(
+ "cast(CURRENT_TIMESTAMP as TIMESTAMP)",
+ "castToTimestamp(currentTimestamp(__epoch_time__),
__time_zone__)");
+ testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt,
__time_zone__)");
}
private void testFilterExpression(String expression, String
expressionExpect) {