This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 95732a18a [flink][kafka-cdc] Kafka sync table supports to start from 
an empty topic (#2117)
95732a18a is described below

commit 95732a18a23e8cfac83ffa442f47efbf92ef3a1a
Author: yuzelin <[email protected]>
AuthorDate: Wed Oct 11 21:50:52 2023 +0800

    [flink][kafka-cdc] Kafka sync table supports to start from an empty topic 
(#2117)
---
 docs/content/how-to/cdc-ingestion.md               | 36 +++++++-
 .../flink/action/cdc/CdcActionCommonUtils.java     | 15 ++++
 .../flink/action/cdc/ComputedColumnUtils.java      |  7 +-
 .../cdc/MessageQueueSyncTableActionBase.java       | 99 +++++++++++++++++-----
 .../action/cdc/kafka/KafkaSyncTableAction.java     | 16 ++--
 .../action/cdc/mongodb/MongoDBSyncTableAction.java |  6 +-
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  2 +-
 .../action/cdc/pulsar/PulsarSyncTableAction.java   | 17 ++--
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 44 ++++++++++
 .../canal/table/initialemptytopic/canal-data-1.txt | 19 +++++
 10 files changed, 215 insertions(+), 46 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 1e9afe41a..c5dc25523 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -355,7 +355,7 @@ To use this feature through `flink run`, run the following 
shell command.
 
 If the Paimon table you specify does not exist, this action will automatically 
create the table. Its schema will be derived from all specified Kafka topic's 
tables,it gets the earliest non-DDL data parsing schema from topic. If the 
Paimon table already exists, its schema will be compared against the schema of 
all specified Kafka topic's tables.
 
-Example
+Example 1:
 
 ```bash
 <FLINK_HOME>/bin/flink run \
@@ -377,6 +377,40 @@ Example
     --table-conf changelog-producer=input \
     --table-conf sink.parallelism=4
 ```
+
+If the kafka topic doesn't contain message when you start the synchronization 
job, you must manually create the table 
+before submitting the job. You can define the partition keys and primary keys 
only, and the left columns will be added 
+by the synchronization job.
+
+NOTE: In this case you shouldn't use --partition-keys or --primary-keys, 
because those keys are defined when creating 
+the table and can not be modified. Additionally, if you specified computed 
columns, you should also define all the argument 
+columns used for computed columns.
+
+Example 2:
+If you want to synchronize a table which has primary key 'id INT', and you 
want to compute a partition key 'part=date_format(create_time,yyyy-MM-dd)',
+you can create a such table first (the other columns can be omitted):
+```sql
+CREATE TABLE test_db.test_table (
+    id INT,                 -- primary key
+    create_time TIMESTAMP,  -- the argument of computed column part
+    part STRING,            -- partition key
+    PRIMARY KEY (id, part) NOT ENFORCED
+) PARTITIONED BY (part);
+```
+
+Then you can submit synchronization job:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    kafka-sync-table \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --table test_table \
+    --computed-column 'part=date_format(create_time,yyyy-MM-dd)' \
+    ... (other conf)
+```
+
 ### Synchronizing Databases
 
 By using [KafkaSyncDatabaseAction](/docs/{{< param Branch 
>}}/api/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction) 
in a Flink DataStream job or directly through `flink run`, users can 
synchronize the multi topic or one topic into one Paimon database.
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 20d16460f..ee72493d1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -145,6 +145,21 @@ public class CdcActionCommonUtils {
         return columnLowerCase;
     }
 
+    public static Schema buildPaimonSchema(
+            List<String> specifiedPartitionKeys,
+            List<String> specifiedPrimaryKeys,
+            List<ComputedColumn> computedColumns,
+            Map<String, String> tableConfig,
+            Schema sourceSchema) {
+        return buildPaimonSchema(
+                specifiedPartitionKeys,
+                specifiedPrimaryKeys,
+                computedColumns,
+                tableConfig,
+                sourceSchema,
+                new CdcMetadataConverter[] {});
+    }
+
     public static Schema buildPaimonSchema(
             List<String> specifiedPartitionKeys,
             List<String> specifiedPrimaryKeys,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index ccc0764a2..ed3c48d5f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc;
 
-import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.Preconditions;
@@ -35,9 +34,9 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class ComputedColumnUtils {
 
     public static List<ComputedColumn> buildComputedColumns(
-            List<String> computedColumnArgs, Schema schema) {
+            List<String> computedColumnArgs, List<DataField> physicFields) {
         Map<String, DataType> typeMapping =
-                schema.fields().stream()
+                physicFields.stream()
                         .collect(
                                 Collectors.toMap(DataField::name, 
DataField::type, (v1, v2) -> v2));
 
@@ -71,7 +70,7 @@ public class ComputedColumnUtils {
             checkArgument(
                     typeMapping.containsKey(fieldReference),
                     String.format(
-                            "Referenced field '%s' is not in given MySQL 
fields: %s.",
+                            "Referenced field '%s' is not in given fields: 
%s.",
                             fieldReference, typeMapping.keySet()));
 
             computedColumns.add(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index 15571d74d..42fdff6a2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
@@ -37,6 +36,8 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,6 +48,7 @@ import java.util.Map;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema;
 import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
+import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
  * Base {@link Action} for synchronizing one message queue topic into one 
Paimon table.
@@ -76,6 +78,9 @@ import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComput
  */
 public abstract class MessageQueueSyncTableActionBase extends ActionBase {
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MessageQueueSyncTableActionBase.class);
+
     protected final String database;
     protected final String table;
     protected final Configuration mqConfig;
@@ -135,7 +140,9 @@ public abstract class MessageQueueSyncTableActionBase 
extends ActionBase {
 
     protected abstract Source<String, ?, ?> buildSource();
 
-    protected abstract Schema buildSchema();
+    protected abstract String topic();
+
+    protected abstract MessageQueueSchemaUtils.ConsumerWrapper consumer(String 
topic);
 
     protected abstract DataFormat getDataFormat();
 
@@ -149,27 +156,48 @@ public abstract class MessageQueueSyncTableActionBase 
extends ActionBase {
         boolean caseSensitive = catalog.caseSensitive();
         // TODO: add case validate
 
-        Schema mqSchema = buildSchema();
-
         Identifier identifier = new Identifier(database, table);
-        List<ComputedColumn> computedColumns = 
buildComputedColumns(computedColumnArgs, mqSchema);
-        Schema fromMq =
-                buildPaimonSchema(
-                        partitionKeys,
-                        primaryKeys,
-                        computedColumns,
-                        tableConfig,
-                        mqSchema,
-                        new CdcMetadataConverter[] {});
-
-        try {
-            fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
-            fileStoreTable = fileStoreTable.copy(tableConfig);
-            assertSchemaCompatible(fileStoreTable.schema(), fromMq.fields());
-        } catch (Catalog.TableNotExistException e) {
+        List<ComputedColumn> computedColumns;
+        if (catalog.tableExists(identifier)) {
+            fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier).copy(tableConfig);
+            try {
+                Schema retrievedSchema = retrieveSchema();
+                computedColumns =
+                        buildComputedColumns(computedColumnArgs, 
retrievedSchema.fields());
+                Schema fromMq =
+                        buildPaimonSchema(
+                                partitionKeys,
+                                primaryKeys,
+                                computedColumns,
+                                tableConfig,
+                                retrievedSchema);
+                assertSchemaCompatible(fileStoreTable.schema(), 
fromMq.fields());
+            } catch (MessageQueueSchemaUtils.SchemaRetrievalException e) {
+                LOG.info(
+                        "Failed to retrieve schema from message queue but 
there exists specified Paimon table. "
+                                + "Schema compatibility check will be skipped. 
If you have specified computed columns, "
+                                + "here will use the existed Paimon table 
schema to build them. Please make sure "
+                                + "the Paimon table has defined all the 
argument columns used for computed columns.");
+                computedColumns =
+                        buildComputedColumns(computedColumnArgs, 
fileStoreTable.schema().fields());
+                // check partition keys and primary keys in case that user 
specified them
+                checkConstraints();
+            }
+        } else {
+            Schema retrievedSchema = retrieveSchema();
+            computedColumns = buildComputedColumns(computedColumnArgs, 
retrievedSchema.fields());
+            Schema fromMq =
+                    buildPaimonSchema(
+                            partitionKeys,
+                            primaryKeys,
+                            computedColumns,
+                            tableConfig,
+                            retrievedSchema);
+
             catalog.createTable(identifier, fromMq, false);
-            fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
+            fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier).copy(tableConfig);
         }
+
         DataFormat format = getDataFormat();
         RecordParser recordParser =
                 format.createParser(caseSensitive, typeMapping, 
computedColumns);
@@ -196,6 +224,37 @@ public abstract class MessageQueueSyncTableActionBase 
extends ActionBase {
         sinkBuilder.build();
     }
 
+    private Schema retrieveSchema() throws Exception {
+        String topic = topic();
+        try (MessageQueueSchemaUtils.ConsumerWrapper consumer = 
consumer(topic)) {
+            return MessageQueueSchemaUtils.getSchema(consumer, topic, 
getDataFormat(), typeMapping);
+        }
+    }
+
+    private void checkConstraints() {
+        if (!partitionKeys.isEmpty()) {
+            List<String> actualPartitionKeys = fileStoreTable.partitionKeys();
+            checkState(
+                    actualPartitionKeys.size() == partitionKeys.size()
+                            && actualPartitionKeys.containsAll(partitionKeys),
+                    "Specified partition keys [%s] are not equal to the 
existed table partition keys [%s]. "
+                            + "You should remove the --partition-keys argument 
or re-create the table if the partition keys are wrong.",
+                    String.join(",", partitionKeys),
+                    String.join(",", actualPartitionKeys));
+        }
+
+        if (!primaryKeys.isEmpty()) {
+            List<String> actualPrimaryKeys = fileStoreTable.primaryKeys();
+            checkState(
+                    actualPrimaryKeys.size() == primaryKeys.size()
+                            && actualPrimaryKeys.containsAll(primaryKeys),
+                    "Specified primary keys [%s] are not equal to the existed 
table primary keys [%s]. "
+                            + "You should remove the --primary-keys argument 
or re-create the table if the primary keys are wrong.",
+                    String.join(",", primaryKeys),
+                    String.join(",", actualPrimaryKeys));
+        }
+    }
+
     @VisibleForTesting
     public Map<String, String> tableConfig() {
         return tableConfig;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 06a51aafd..badbb518a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action.cdc.kafka;
 import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
-import org.apache.paimon.schema.Schema;
 
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
@@ -46,14 +45,13 @@ public class KafkaSyncTableAction extends 
MessageQueueSyncTableActionBase {
     }
 
     @Override
-    protected Schema buildSchema() {
-        String topic = mqConfig.get(KafkaConnectorOptions.TOPIC).get(0);
-        try (MessageQueueSchemaUtils.ConsumerWrapper consumer =
-                KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic)) {
-            return MessageQueueSchemaUtils.getSchema(consumer, topic, 
getDataFormat(), typeMapping);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+    protected String topic() {
+        return mqConfig.get(KafkaConnectorOptions.TOPIC).get(0);
+    }
+
+    @Override
+    protected MessageQueueSchemaUtils.ConsumerWrapper consumer(String topic) {
+        return KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 53d5cf483..95ff2ecb9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.ActionBase;
-import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -133,7 +132,7 @@ public class MongoDBSyncTableAction extends ActionBase {
         Schema mongodbSchema = 
MongodbSchemaUtils.getMongodbSchema(mongodbConfig, caseSensitive);
         catalog.createDatabase(database, true);
         List<ComputedColumn> computedColumns =
-                buildComputedColumns(computedColumnArgs, mongodbSchema);
+                buildComputedColumns(computedColumnArgs, 
mongodbSchema.fields());
 
         Identifier identifier = new Identifier(database, table);
         Schema fromMongodb =
@@ -142,8 +141,7 @@ public class MongoDBSyncTableAction extends ActionBase {
                         Collections.emptyList(),
                         computedColumns,
                         tableConfig,
-                        mongodbSchema,
-                        new CdcMetadataConverter[] {});
+                        mongodbSchema);
         // Check if table exists before trying to get or create it
         if (catalog.tableExists(identifier)) {
             fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 0b89039f5..130caab01 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -179,7 +179,7 @@ public class MySqlSyncTableAction extends ActionBase {
         MySqlTableInfo tableInfo = mySqlSchemasInfo.mergeAll();
         Identifier identifier = new Identifier(database, table);
         List<ComputedColumn> computedColumns =
-                buildComputedColumns(computedColumnArgs, tableInfo.schema());
+                buildComputedColumns(computedColumnArgs, 
tableInfo.schema().fields());
 
         CdcMetadataConverter[] metadataConverters =
                 metadataColumn.stream()
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
index 6221ff8d4..c1ccd2799 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
@@ -21,9 +21,9 @@ package org.apache.paimon.flink.action.cdc.pulsar;
 import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
-import org.apache.paimon.schema.Schema;
 
 import org.apache.flink.api.connector.source.Source;
+import org.apache.pulsar.client.api.PulsarClientException;
 
 import java.util.Map;
 
@@ -45,12 +45,15 @@ public class PulsarSyncTableAction extends 
MessageQueueSyncTableActionBase {
     }
 
     @Override
-    protected Schema buildSchema() {
-        String topic = 
mqConfig.get(PulsarActionUtils.TOPIC).split(",")[0].trim();
-        try (MessageQueueSchemaUtils.ConsumerWrapper consumer =
-                PulsarActionUtils.createPulsarConsumer(mqConfig, topic)) {
-            return MessageQueueSchemaUtils.getSchema(consumer, topic, 
getDataFormat(), typeMapping);
-        } catch (Exception e) {
+    protected String topic() {
+        return mqConfig.get(PulsarActionUtils.TOPIC).split(",")[0].trim();
+    }
+
+    @Override
+    protected MessageQueueSchemaUtils.ConsumerWrapper consumer(String topic) {
+        try {
+            return PulsarActionUtils.createPulsarConsumer(mqConfig, topic);
+        } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 7738106fd..6cd3d04e3 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -1028,4 +1028,48 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         "+I[2, 4, four, NULL, NULL, NULL, NULL]");
         waitForResult(expectedDelete, table, rowType, primaryKeys);
     }
+
+    @Test
+    @Timeout(120)
+    public void testSyncWithInitialEmptyTopic() throws Exception {
+        String topic = "initial_empty_topic";
+        createTestTopic(topic, 1, 1);
+        createFileStoreTable(
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.DATE(), 
DataTypes.INT().notNull(),
+                        },
+                        new String[] {"_id", "_date", "_year"}),
+                Collections.singletonList("_year"),
+                Arrays.asList("_id", "_year"),
+                Collections.emptyMap());
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "canal-json");
+        kafkaConfig.put("topic", topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withComputedColumnArgs("_year=year(_date)")
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        List<String> lines = 
readLines("kafka/canal/table/initialemptytopic/canal-data-1.txt");
+        writeRecordsToKafka(topic, lines);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.DATE(),
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"_id", "_date", "_year", "v"});
+        waitForResult(
+                Collections.singletonList("+I[1, 19439, 2023, paimon]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Arrays.asList("_id", "_year"));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/initialemptytopic/canal-data-1.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/initialemptytopic/canal-data-1.txt
new file mode 100644
index 000000000..8ede99fa7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/table/initialemptytopic/canal-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"_id":"1","_date":"2023-03-23", 
"v":"paimon"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"_id":"INT","_date":"DATE","v":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"_id":4,"_date":91,"v":12},"table":"test_empty_topic","ts":1683006706728,"type":"INSERT"}

Reply via email to