This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 79bfb05ccf [flink-cdc] Provide option to disable use of source primary
keys if primary keys in action command are not specified for CDC ingestion.
(#5793)
79bfb05ccf is described below
commit 79bfb05ccfaa5b8aff3ffdeb422740a869c07c30
Author: Ashish Khatkar <[email protected]>
AuthorDate: Thu Jul 31 08:16:39 2025 +0100
[flink-cdc] Provide option to disable use of source primary keys if primary
keys in action command are not specified for CDC ingestion. (#5793)
---
.../shortcodes/generated/kafka_sync_database.html | 10 +++-
.../shortcodes/generated/kafka_sync_table.html | 4 ++
.../generated/mongodb_sync_database.html | 10 +++-
.../shortcodes/generated/mysql_sync_database.html | 10 +++-
.../shortcodes/generated/mysql_sync_table.html | 4 ++
.../shortcodes/generated/postgres_sync_table.html | 4 ++
.../shortcodes/generated/pulsar_sync_database.html | 10 +++-
.../shortcodes/generated/pulsar_sync_table.html | 4 ++
.../flink/action/cdc/CdcActionCommonUtils.java | 15 ++++--
.../cdc/MessageQueueSyncTableActionBase.java | 4 +-
.../flink/action/cdc/SyncDatabaseActionBase.java | 1 +
.../action/cdc/SyncDatabaseActionFactoryBase.java | 6 +++
.../flink/action/cdc/SyncTableActionBase.java | 3 +-
.../action/cdc/SyncTableActionFactoryBase.java | 6 +++
.../action/cdc/SynchronizationActionBase.java | 8 ++++
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 3 +-
.../flink/sink/cdc/NewTableSchemaBuilder.java | 6 ++-
.../flink/action/cdc/CdcActionITCaseBase.java | 11 ++++-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 53 ++++++++++++++++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 13 +++++-
20 files changed, 165 insertions(+), 20 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 4b6ee3e38d..e8d5898c34 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -115,8 +115,14 @@ under the License.
<tr>
<td><h5>--primary_keys</h5></td>
<td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
- If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
- Otherwise, the sink table won't set primary keys.</td>
+ If the keys are not provided, but the source has primary keys, the
sink table will use source's primary keys.
+ Otherwise, the sink table won't set primary keys.
+ If the keys are not provided, but the source has primary keys, and
you don't want to use source's primary keys,
+ use --sync_primary_keys_from_source_schema.</td>
+ </tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
</tr>
<tr>
<tr>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html
b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index 2345a4d84a..10669f594f 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -62,6 +62,10 @@ under the License.
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
+ </tr>
<tr>
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from
Kafka topic's table field name. See <a
href="../overview/#computed-functions">here</a> for a complete list of
configurations. </td>
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
index e0a11fb0e8..2a854259e4 100644
--- a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
@@ -57,8 +57,14 @@ under the License.
<tr>
<td><h5>--primary_keys</h5></td>
<td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
- If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
- Otherwise, the sink table won't set primary keys.</td>
+ If the keys are not provided, but the source has primary keys, the
sink table will use source's primary keys.
+ Otherwise, the sink table won't set primary keys.
+ If the keys are not provided, but the source has primary keys, and
you don't want to use source's primary keys,
+ use --sync_primary_keys_from_source_schema.</td>
+ </tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
</tr>
<tr>
<td><h5>--mongodb_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index a32fa920de..9b2fe648d2 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -89,8 +89,14 @@ under the License.
<tr>
<td><h5>--primary_keys</h5></td>
<td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
- If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
- Otherwise, the sink table won't set primary keys.</td>
+ If the keys are not provided, but the source has primary keys, the
sink table will use source's primary keys.
+ Otherwise, the sink table won't set primary keys.
+ If the keys are not provided, but the source has primary keys, and
you don't want to use source's primary keys,
+ use --sync_primary_keys_from_source_schema.</td>
+ </tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
</tr>
<tr>
<td><h5>--mysql_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index d66541ac64..25901973c9 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -61,6 +61,10 @@ under the License.
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
+ </tr>
<tr>
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from
MySQL table field name. See <a href="../overview/#computed-functions">here</a>
for a complete list of configurations. </td>
diff --git a/docs/layouts/shortcodes/generated/postgres_sync_table.html
b/docs/layouts/shortcodes/generated/postgres_sync_table.html
index 6bc2cb7b2c..b660d20b15 100644
--- a/docs/layouts/shortcodes/generated/postgres_sync_table.html
+++ b/docs/layouts/shortcodes/generated/postgres_sync_table.html
@@ -54,6 +54,10 @@ under the License.
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
+ </tr>
<tr>
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from
PostgreSQL table field name. See <a
href="../overview/#computed-functions">here</a> for a complete list of
configurations. </td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_database.html
b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
index 58d95e88ed..9e87348362 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_database.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
@@ -77,8 +77,14 @@ under the License.
<tr>
<td><h5>--primary_keys</h5></td>
<td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
- If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
- Otherwise, the sink table won't set primary keys.</td>
+ If the keys are not provided, but the source has primary keys, the
sink table will use source's primary keys.
+ Otherwise, the sink table won't set primary keys.
+ If the keys are not provided, but the source has primary keys, and
you don't want to use source's primary keys,
+ use --sync_primary_keys_from_source_schema.</td>
+ </tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
</tr>
<tr>
<td><h5>--pulsar_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_table.html
b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
index 4fc16910e0..30c0b4ae3c 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_table.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
@@ -61,6 +61,10 @@ under the License.
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+ <td>This is used to specify if primary keys from source should be used
in paimon schema if primary keys using --primary_keys are not specified. The
default is true.</td>
+ </tr>
<tr>
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from
Pulsar topic's table field name. See <a
href="../overview/#computed-functions">here</a> for a complete list of
configurations. </td>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 46127c5f78..6a3001319c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -70,6 +70,8 @@ public class CdcActionCommonUtils {
public static final String METADATA_COLUMN = "metadata_column";
public static final String MULTIPLE_TABLE_PARTITION_KEYS =
"multiple_table_partition_keys";
public static final String EAGER_INIT = "eager_init";
+ public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA =
+ "sync_primary_keys_from_source_schema";
public static void assertSchemaCompatible(
TableSchema paimonSchema, List<DataField> sourceTableFields) {
@@ -122,7 +124,8 @@ public class CdcActionCommonUtils {
CdcMetadataConverter[] metadataConverters,
boolean caseSensitive,
boolean strictlyCheckSpecified,
- boolean requirePrimaryKeys) {
+ boolean requirePrimaryKeys,
+ boolean syncPKeysFromSourceSchema) {
Schema.Builder builder = Schema.newBuilder();
// options
@@ -165,7 +168,8 @@ public class CdcActionCommonUtils {
sourceSchemaPrimaryKeys,
allFieldNames,
strictlyCheckSpecified,
- requirePrimaryKeys);
+ requirePrimaryKeys,
+ syncPKeysFromSourceSchema);
// partition keys
specifiedPartitionKeys = listCaseConvert(specifiedPartitionKeys,
caseSensitive);
@@ -185,7 +189,8 @@ public class CdcActionCommonUtils {
List<String> sourceSchemaPrimaryKeys,
List<String> allFieldNames,
boolean strictlyCheckSpecified,
- boolean requirePrimaryKeys) {
+ boolean requirePrimaryKeys,
+ boolean syncPKeysFromSourceSchema) {
if (!specifiedPrimaryKeys.isEmpty()) {
if (allFieldNames.containsAll(specifiedPrimaryKeys)) {
builder.primaryKey(specifiedPrimaryKeys);
@@ -205,12 +210,12 @@ public class CdcActionCommonUtils {
}
}
- if (!sourceSchemaPrimaryKeys.isEmpty()) {
+ if (syncPKeysFromSourceSchema && !sourceSchemaPrimaryKeys.isEmpty()) {
builder.primaryKey(sourceSchemaPrimaryKeys);
return;
}
- if (requirePrimaryKeys) {
+ if (requirePrimaryKeys && syncPKeysFromSourceSchema) {
throw new IllegalArgumentException(
"Failed to set specified primary keys for sink table "
+ tableName
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index 1e1671b4a3..6dbb2b489d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -52,7 +52,6 @@ import java.util.Map;
* </ul>
*/
public abstract class MessageQueueSyncTableActionBase extends
SyncTableActionBase {
-
public MessageQueueSyncTableActionBase(
String database,
String table,
@@ -87,6 +86,7 @@ public abstract class MessageQueueSyncTableActionBase extends
SyncTableActionBas
metadataConverters,
caseSensitive,
true,
- false);
+ false,
+ this.syncPKeysFromSourceSchema);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4ce4e7c250..36a9be033e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -202,6 +202,7 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
partitionKeys,
primaryKeys,
requirePrimaryKeys(),
+ syncPKeysFromSourceSchema,
partitionKeyMultiple,
metadataConverters);
Pattern tblIncludingPattern = Pattern.compile(includingTables);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index 8d0b8b9cef..28134375f6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -34,6 +34,7 @@ import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB;
@@ -86,5 +87,10 @@ public abstract class SyncDatabaseActionFactoryBase<T
extends SyncDatabaseAction
action.withComputedColumnArgs(
new
ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
}
+
+ if (params.has(SYNC_PKEYS_FROM_SOURCE_SCHEMA)) {
+ action.syncPKeysFromSourceSchema(
+
Boolean.parseBoolean(params.get(SYNC_PKEYS_FROM_SOURCE_SCHEMA)));
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index e0de071d29..0f984d3654 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -107,7 +107,8 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
metadataConverters,
caseSensitive,
true,
- true);
+ true,
+ this.syncPKeysFromSourceSchema);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
index 08b9131f71..cb9b678d19 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
@@ -31,6 +31,7 @@ import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_C
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
/** Base {@link ActionFactory} for synchronizing into one Paimon table. */
@@ -76,5 +77,10 @@ public abstract class SyncTableActionFactoryBase
String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
+
+ if (params.has(SYNC_PKEYS_FROM_SOURCE_SCHEMA)) {
+ boolean flag =
Boolean.parseBoolean(params.get(SYNC_PKEYS_FROM_SOURCE_SCHEMA));
+ action.syncPKeysFromSourceSchema(flag);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 446ce26cb3..d4b2b15394 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -68,6 +68,9 @@ public abstract class SynchronizationActionBase extends
ActionBase {
protected Map<String, String> tableConfig = new HashMap<>();
protected TypeMapping typeMapping = TypeMapping.defaultMapping();
+ // this is to specify if we should use primary keys from source
+ // in paimon schema if pkeys are not specified in action command
+ protected boolean syncPKeysFromSourceSchema = true;
protected CdcMetadataConverter[] metadataConverters = new
CdcMetadataConverter[] {};
public SynchronizationActionBase(
@@ -102,6 +105,11 @@ public abstract class SynchronizationActionBase extends
ActionBase {
return this;
}
+ public SynchronizationActionBase syncPKeysFromSourceSchema(boolean flag) {
+ this.syncPKeysFromSourceSchema = flag;
+ return this;
+ }
+
@VisibleForTesting
public Map<String, String> tableConfig() {
return tableConfig;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 790467ba0d..14238fd818 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -154,7 +154,8 @@ public class MySqlSyncDatabaseAction extends
SyncDatabaseActionBase {
metadataConverters,
caseSensitive,
false,
- true);
+ true,
+ this.syncPKeysFromSourceSchema);
try {
table = (FileStoreTable) catalog.getTable(identifier);
Supplier<String> errMsg =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index 0d4a74ac2f..1435cfd5da 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -38,6 +38,7 @@ public class NewTableSchemaBuilder implements Serializable {
private final List<String> partitionKeys;
private final List<String> primaryKeys;
private final boolean requirePrimaryKeys;
+ private final boolean syncPKeysFromSourceSchema;
private final CdcMetadataConverter[] metadataConverters;
private final Map<String, List<String>> partitionKeyMultiple;
@@ -47,6 +48,7 @@ public class NewTableSchemaBuilder implements Serializable {
List<String> partitionKeys,
List<String> primaryKeys,
boolean requirePrimaryKeys,
+ boolean syncPKeysFromSourceSchema,
Map<String, List<String>> partitionKeyMultiple,
CdcMetadataConverter[] metadataConverters) {
this.tableConfig = tableConfig;
@@ -55,6 +57,7 @@ public class NewTableSchemaBuilder implements Serializable {
this.partitionKeys = partitionKeys;
this.primaryKeys = primaryKeys;
this.requirePrimaryKeys = requirePrimaryKeys;
+ this.syncPKeysFromSourceSchema = syncPKeysFromSourceSchema;
this.partitionKeyMultiple = partitionKeyMultiple;
}
@@ -80,6 +83,7 @@ public class NewTableSchemaBuilder implements Serializable {
metadataConverters,
caseSensitive,
false,
- requirePrimaryKeys));
+ requirePrimaryKeys,
+ syncPKeysFromSourceSchema));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 855623b1af..312c323e6a 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -326,6 +326,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
private final List<String> computedColumnArgs = new ArrayList<>();
private final List<String> typeMappingModes = new ArrayList<>();
private final List<String> metadataColumns = new ArrayList<>();
+ private boolean syncPKeysFromSourceSchema = true;
public SyncTableActionBuilder(Class<T> clazz, Map<String, String>
sourceConfig) {
this.clazz = clazz;
@@ -371,6 +372,11 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
return this;
}
+ public SyncTableActionBuilder<T> syncPKeysFromSourceSchema(boolean
flag) {
+ this.syncPKeysFromSourceSchema = flag;
+ return this;
+ }
+
public T build() {
List<String> args =
new ArrayList<>(
@@ -381,7 +387,9 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
"--database",
database,
"--table",
- tableName));
+ tableName,
+ "--sync_primary_keys_from_source_schema",
+
String.valueOf(syncPKeysFromSourceSchema)));
args.addAll(mapToArgs(getConfKey(clazz), sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -393,6 +401,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
args.addAll(listToMultiArgs("--computed-column",
computedColumnArgs));
args.addAll(listToMultiArgs("--metadata-column", metadataColumns));
+ args.add("--use_pkeys_from_source_for_paimon_schema");
return createAction(clazz, args);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 171b489d42..b48a1c79cf 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1599,4 +1599,57 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table, rowType, primaryKeys);
}
}
+
+ @Test
+ @Timeout(60)
+ public void testSyncPrimaryKeysFromSourceSchemaTrue() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name",
"check_sync_primary_keys_from_source_schema");
+ mySqlConfig.put("table-name", "t");
+
+ MySqlSyncTableAction action =
+
syncTableActionBuilder(mySqlConfig).withTableConfig(getBasicTableConfig()).build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable();
+ TableSchema schema = table.schema();
+ assertThat(schema.primaryKeys().isEmpty()).isEqualTo(false);
+
assertThat(schema.primaryKeys()).isEqualTo(Collections.singletonList("k"));
+
+ List<String> expectedInsert = Arrays.asList("+I[1, Apache]", "+I[2,
Paimon]");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ waitForResult(expectedInsert, table, rowType,
Collections.singletonList("k"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSyncPrimaryKeysFromSourceSchemaFalse() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name",
"check_sync_primary_keys_from_source_schema");
+ mySqlConfig.put("table-name", "t");
+
+ Map<String, String> tableConfig = getBasicTableConfig();
+ tableConfig.put("bucket-key", "v1");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withTableConfig(tableConfig)
+ .syncPKeysFromSourceSchema(false)
+ .build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable();
+ TableSchema schema = table.schema();
+ assertThat(schema.primaryKeys().isEmpty()).isEqualTo(true);
+
+ List<String> expectedInsert = Arrays.asList("+I[1, Apache]", "+I[2,
Paimon]");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ waitForResult(expectedInsert, table, rowType, Collections.emptyList());
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 66e0b776d0..ae0186cf70 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -456,4 +456,15 @@ USE check_cdc_sync_runtime_execution_mode;
CREATE TABLE t (
k INT PRIMARY KEY,
v1 VARCHAR(10)
-);
\ No newline at end of file
+);
+
+--
################################################################################
+-- testSyncPrimaryKeysFromSourceSchema{True/False}
+--
################################################################################
+CREATE DATABASE check_sync_primary_keys_from_source_schema;
+USE check_sync_primary_keys_from_source_schema;
+CREATE TABLE t (
+ k INT PRIMARY KEY,
+ v1 VARCHAR(10)
+);
+INSERT INTO t VALUES (1, 'Apache'), (2, 'Paimon');