This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 384674c9aa [cdc] Support computed expression of now (#5138)
384674c9aa is described below

commit 384674c9aa940bd5dc886297a3a146db11c49c8d
Author: JackeyLee007 <[email protected]>
AuthorDate: Sat Mar 15 19:22:14 2025 +0800

    [cdc] Support computed expression of now (#5138)
---
 .../apache/paimon/flink/action/cdc/Expression.java | 21 ++++-
 .../flink/action/cdc/SyncDatabaseActionBase.java   |  9 ++-
 .../action/cdc/SyncDatabaseActionFactoryBase.java  |  7 ++
 .../flink/action/cdc/CdcActionITCaseBase.java      | 36 ++++++++-
 .../format/aliyun/AliyunJsonRecordParserTest.java  | 66 +++++++++++++--
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  | 94 ++++++++++++++++++++++
 .../canal/database/audit-time/canal-data-1.txt     | 19 +++++
 .../canal/database/audit-time/canal-data-2.txt     | 19 +++++
 .../canal/database/audit-time/canal-data-3.txt     | 19 +++++
 .../paimon/flink/action/ActionITCaseBase.java      | 19 +++++
 10 files changed, 300 insertions(+), 9 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 3290ec1829..50bd57da36 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -141,7 +141,8 @@ public interface Expression extends Serializable {
                             referencedField.fieldType(),
                             referencedField.literals());
                 }),
-        CAST((typeMapping, caseSensitive, args) -> cast(args));
+        CAST((typeMapping, caseSensitive, args) -> cast(args)),
+        NOW((typeMapping, caseSensitive, args) -> new NowExpression());
 
         public final ExpressionCreator creator;
 
@@ -608,4 +609,22 @@ public interface Expression extends Serializable {
             return value;
         }
     }
+
+    /** Get current timestamp. */
+    final class NowExpression implements Expression {
+        @Override
+        public String fieldReference() {
+            return null;
+        }
+
+        @Override
+        public DataType outputType() {
+            return DataTypes.TIMESTAMP(3);
+        }
+
+        @Override
+        public String eval(String input) {
+            return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 267d682ca6..4ce4e7c250 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -45,6 +45,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
+import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
 
 /** Base {@link Action} for synchronizing into one Paimon database. */
 public abstract class SyncDatabaseActionBase extends SynchronizationActionBase 
{
@@ -60,6 +61,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
     protected String includingTables = ".*";
     protected List<String> partitionKeys = new ArrayList<>();
     protected List<String> primaryKeys = new ArrayList<>();
+    protected List<ComputedColumn> computedColumns = new ArrayList<>();
     @Nullable protected String excludingTables;
     protected String includingDbs = ".*";
     @Nullable protected String excludingDbs;
@@ -172,10 +174,15 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         return this;
     }
 
+    public SyncDatabaseActionBase withComputedColumnArgs(List<String> 
computedColumnArgs) {
+        this.computedColumns = buildComputedColumns(computedColumnArgs, 
Collections.emptyList());
+        return this;
+    }
+
     @Override
     protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
recordParse() {
         return syncJobHandler.provideRecordParser(
-                Collections.emptyList(), typeMapping, metadataConverters);
+                this.computedColumns, typeMapping, metadataConverters);
     }
 
     public SyncDatabaseActionBase withPartitionKeyMultiple(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index 52bfb7271c..8d0b8b9cef 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -22,8 +22,10 @@ import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
 import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 
+import java.util.ArrayList;
 import java.util.Optional;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EAGER_INIT;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
@@ -79,5 +81,10 @@ public abstract class SyncDatabaseActionFactoryBase<T 
extends SyncDatabaseAction
             String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
+
+        if (params.has(COMPUTED_COLUMN)) {
+            action.withComputedColumnArgs(
+                    new 
ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 00a8b23617..855623b1af 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -130,6 +130,16 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
     protected void waitForResult(
             List<String> expected, FileStoreTable table, RowType rowType, 
List<String> primaryKeys)
             throws Exception {
+        waitForResult(false, expected, table, rowType, primaryKeys);
+    }
+
+    protected void waitForResult(
+            boolean withRegx,
+            List<String> expected,
+            FileStoreTable table,
+            RowType rowType,
+            List<String> primaryKeys)
+            throws Exception {
         assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
 
         // wait for table schema to become our expected schema
@@ -165,7 +175,8 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
                             rowType);
             List<String> sortedActual = new ArrayList<>(result);
             Collections.sort(sortedActual);
-            if (sortedExpected.equals(sortedActual)) {
+            if (withRegx && isRegxMatchList(sortedActual, sortedExpected)
+                    || sortedExpected.equals(sortedActual)) {
                 break;
             }
             LOG.info("actual: " + sortedActual);
@@ -174,6 +185,20 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
         }
     }
 
+    private boolean isRegxMatchList(List<String> actual, List<String> 
expected) {
+        if (actual.size() != expected.size()) {
+            return false;
+        }
+
+        for (int i = 0; i < actual.size(); i++) {
+            if (!actual.get(i).matches(expected.get(i))) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     protected Map<String, String> getBasicTableConfig() {
         Map<String, String> config = new HashMap<>();
         ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -392,6 +417,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
         private final List<String> partitionKeys = new ArrayList<>();
         private final List<String> primaryKeys = new ArrayList<>();
         private final List<String> metadataColumn = new ArrayList<>();
+        private final List<String> computedColumnArgs = new ArrayList<>();
         protected Map<String, String> partitionKeyMultiple = new HashMap<>();
 
         public SyncDatabaseActionBuilder(Class<T> clazz, Map<String, String> 
sourceConfig) {
@@ -464,6 +490,12 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
             return this;
         }
 
+        public SyncDatabaseActionBuilder<T> withComputedColumnArgs(
+                List<String> computedColumnArgs) {
+            this.computedColumnArgs.addAll(computedColumnArgs);
+            return this;
+        }
+
         public SyncDatabaseActionBuilder<T> withPartitionKeyMultiple(
                 Map<String, String> partitionKeyMultiple) {
             if (partitionKeyMultiple != null) {
@@ -500,6 +532,8 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
             args.addAll(listToArgs("--primary-keys", primaryKeys));
             args.addAll(listToArgs("--metadata-column", metadataColumn));
 
+            args.addAll(listToMultiArgs("--computed-column", 
computedColumnArgs));
+
             return createAction(clazz, args);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
index f06268d700..6dea3ee547 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.paimon.flink.action.cdc.format.aliyun;
 
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
 import 
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
@@ -26,6 +29,7 @@ import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.BinaryStringUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /** Test for AliyunJsonRecordParser. */
 public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
@@ -51,14 +56,20 @@ public class AliyunJsonRecordParserTest extends 
KafkaActionITCaseBase {
     private static List<String> insertList = new ArrayList<>();
     private static List<String> updateList = new ArrayList<>();
     private static List<String> deleteList = new ArrayList<>();
+    private static List<ComputedColumn> computedColumns = new ArrayList<>();
 
     private static ObjectMapper objMapper = new ObjectMapper();
 
+    String dateTimeRegex = 
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}";
+
     @Before
     public void setup() {
         String insertRes = "kafka/aliyun/table/event/event-insert.txt";
         String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
         String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
+
+        String[] computedColumnArgs = {"etl_create_time=now()", 
"etl_update_time=now()"};
+
         URL url;
         try {
             url = 
AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
@@ -76,6 +87,10 @@ public class AliyunJsonRecordParserTest extends 
KafkaActionITCaseBase {
                     .filter(this::isRecordLine)
                     .forEach(e -> deleteList.add(e));
 
+            computedColumns =
+                    ComputedColumnUtils.buildComputedColumns(
+                            Arrays.asList(computedColumnArgs), 
Collections.emptyList());
+
         } catch (Exception e) {
             log.error("Fail to init aliyun-json cases", e);
         }
@@ -83,10 +98,11 @@ public class AliyunJsonRecordParserTest extends 
KafkaActionITCaseBase {
 
     @Test
     public void extractInsertRecord() throws Exception {
+
         AliyunRecordParser parser =
-                new AliyunRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+                new AliyunRecordParser(TypeMapping.defaultMapping(), 
computedColumns);
         for (String json : insertList) {
-            // 将json解析为JsonNode对象
+
             JsonNode rootNode = objMapper.readValue(json, JsonNode.class);
             CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode);
             Schema schema = parser.buildSchema(cdcRecord);
@@ -106,15 +122,31 @@ public class AliyunJsonRecordParserTest extends 
KafkaActionITCaseBase {
 
             MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
             Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+
+            Map<String, String> data = 
records.get(0).toRichCdcRecord().toCdcRecord().data();
+            String createTime = data.get("etl_create_time");
+            String updateTime = data.get("etl_update_time");
+
+            // Mock the real timestamp string which retrieved from store and 
convert through paimon
+            // Timestamp
+            createTime =
+                    
BinaryStringUtils.toTimestamp(BinaryString.fromString(createTime), 6)
+                            .toString();
+            updateTime =
+                    
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
+                            .toString();
+
+            Assert.assertTrue(createTime.matches(dateTimeRegex));
+            Assert.assertTrue(updateTime.matches(dateTimeRegex));
         }
     }
 
     @Test
     public void extractUpdateRecord() throws Exception {
         AliyunRecordParser parser =
-                new AliyunRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+                new AliyunRecordParser(TypeMapping.defaultMapping(), 
computedColumns);
         for (String json : updateList) {
-            // 将json解析为JsonNode对象
+
             JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
             CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
             Schema schema = parser.buildSchema(cdcRecord);
@@ -134,15 +166,26 @@ public class AliyunJsonRecordParserTest extends 
KafkaActionITCaseBase {
 
             MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
             Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+
+            Map<String, String> data = 
records.get(0).toRichCdcRecord().toCdcRecord().data();
+            String createTime = data.get("etl_create_time");
+            String updateTime = data.get("etl_update_time");
+            Assert.assertNotNull(createTime);
+
+            updateTime =
+                    
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
+                            .toString();
+
+            Assert.assertTrue(updateTime.matches(dateTimeRegex));
         }
     }
 
     @Test
     public void extractDeleteRecord() throws Exception {
         AliyunRecordParser parser =
-                new AliyunRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+                new AliyunRecordParser(TypeMapping.defaultMapping(), 
computedColumns);
         for (String json : deleteList) {
-            // 将json解析为JsonNode对象
+
             JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
             CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
             Schema schema = parser.buildSchema(cdcRecord);
@@ -162,6 +205,17 @@ public class AliyunJsonRecordParserTest extends 
KafkaActionITCaseBase {
 
             MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
             Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+
+            Map<String, String> data = 
records.get(0).toRichCdcRecord().toCdcRecord().data();
+            String createTime = data.get("etl_create_time");
+            String updateTime = data.get("etl_update_time");
+            Assert.assertNotNull(createTime);
+
+            updateTime =
+                    
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
+                            .toString();
+
+            Assert.assertTrue(updateTime.matches(dateTimeRegex));
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 6e37c589ac..60aa70c34b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
@@ -643,6 +645,98 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                 Collections.singletonList("k"));
     }
 
+    @Test
+    @Timeout(120)
+    public void testExpressionNow() throws Exception {
+        final String topic = "expression-now";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(topic, 
"kafka/canal/database/audit-time/canal-data-1.txt");
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withPrimaryKeys("k")
+                        .withComputedColumnArgs(
+                                Arrays.asList("etl_create_time=now()", 
"etl_update_time=now()"))
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        waitingTables("t1");
+
+        FileStoreTable table1 = getFileStoreTable("t1");
+        assertThat(table1.primaryKeys()).containsExactly("k");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.TIMESTAMP(3),
+                            DataTypes.TIMESTAMP(3)
+                        },
+                        new String[] {"k", "v1", "etl_create_time", 
"etl_update_time"});
+
+        // INSERT
+        waitForResult(
+                true,
+                Collections.singletonList(
+                        "\\+I\\[1, A, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+                table1,
+                rowType1,
+                Collections.singletonList("k"));
+
+        List<InternalRow> data = getData("t1");
+        Timestamp createTime1 = data.get(0).getTimestamp(2, 3);
+        Timestamp updateTime1 = data.get(0).getTimestamp(3, 3);
+
+        
assertThat(createTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+        
assertThat(updateTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
+        Thread.sleep(1000);
+
+        // UPDATE1
+        writeRecordsToKafka(topic, 
"kafka/canal/database/audit-time/canal-data-2.txt");
+        waitForResult(
+                true,
+                Collections.singletonList(
+                        "\\+I\\[1, B, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+                table1,
+                rowType1,
+                Collections.singletonList("k"));
+
+        data = getData("t1");
+        Timestamp createTime2 = data.get(0).getTimestamp(2, 3);
+        Timestamp updateTime2 = data.get(0).getTimestamp(3, 3);
+
+        
assertThat(createTime2.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
+        
assertThat(updateTime2.toLocalDateTime()).isAfter(updateTime1.toLocalDateTime());
+        
assertThat(updateTime2.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
+        Thread.sleep(1000);
+
+        // UPDATE2
+        writeRecordsToKafka(topic, 
"kafka/canal/database/audit-time/canal-data-3.txt");
+        waitForResult(
+                true,
+                Collections.singletonList(
+                        "\\+I\\[1, C, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+                table1,
+                rowType1,
+                Collections.singletonList("k"));
+
+        data = getData("t1");
+        Timestamp createTime3 = data.get(0).getTimestamp(2, 3);
+        Timestamp updateTime3 = data.get(0).getTimestamp(3, 3);
+
+        
assertThat(createTime3.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
+        
assertThat(updateTime3.toLocalDateTime()).isAfter(updateTime2.toLocalDateTime());
+        
assertThat(updateTime3.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+    }
+
     @Test
     @Timeout(60)
     public void testMultipleTablePartitionKeys() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-1.txt
new file mode 100644
index 0000000000..92935ccb20
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","v1":"A"}],"database":"test_audit_time","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","v1":"VARCHAR(10)"},"old":[],"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684770072286,"type":"INSERT"}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-2.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-2.txt
new file mode 100644
index 0000000000..b7b9d9b635
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-2.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","v1":"B"}],"database":"test_audit_time","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","v1":"VARCHAR(10)"},"old":[{"k":"1","v1":"A"}],"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684770072286,"type":"UPDATE"}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-3.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-3.txt
new file mode 100644
index 0000000000..acd51960e4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-3.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","v1":"C"}],"database":"test_audit_time","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","v1":"VARCHAR(10)"},"old":[{"k":"1","v1":"B"}],"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684770072286,"type":"UPDATE"}
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index f8ea189500..6be78e041a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -32,8 +32,10 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.table.api.TableEnvironment;
@@ -45,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -142,6 +145,22 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
         incrementalIdentifier++;
     }
 
+    protected List<InternalRow> getData(String tableName) throws Exception {
+        List<InternalRow> result = new ArrayList<>();
+
+        FileStoreTable table = this.getFileStoreTable(tableName);
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        List<Split> splits = plan == null ? Collections.emptyList() : 
plan.splits();
+        TableRead read = readBuilder.newRead();
+        try (RecordReader<InternalRow> recordReader = 
read.createReader(splits)) {
+            recordReader.forEachRemaining(result::add);
+        }
+
+        return result;
+    }
+
     protected List<String> getResult(TableRead read, List<Split> splits, 
RowType rowType)
             throws Exception {
         try (RecordReader<InternalRow> recordReader = 
read.createReader(splits)) {

Reply via email to