This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 0a4c25643 [FLINK-37262][pipeline-connector/mysql] Fix missing PARSE_ONLINE_SCHEMA_CHANGES option in MySqlDataSourceFactory 0a4c25643 is described below commit 0a4c256433c60b8a43ccc412d38f35904d0b5448 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Tue Feb 11 20:19:09 2025 +0800 [FLINK-37262][pipeline-connector/mysql] Fix missing PARSE_ONLINE_SCHEMA_CHANGES option in MySqlDataSourceFactory This closes #3910 --- .../mysql/factory/MySqlDataSourceFactory.java | 1 + .../mysql/source/MySqlDataSourceFactoryTest.java | 9 +++-- .../connectors/mysql/table/MySqlTableSource.java | 8 +++-- .../mysql/table/MySqlTableSourceFactoryTest.java | 41 ++++++++++++++++++++++ 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 9923226d1..359baaed1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -336,6 +336,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { options.add(INCLUDE_COMMENTS_ENABLED); options.add(USE_LEGACY_JSON_FORMAT); options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED); + options.add(PARSE_ONLINE_SCHEMA_CHANGES); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index de49c7b7e..bc1a7c9ba 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; @@ -258,14 +259,16 @@ public class MySqlDataSourceFactoryTest extends MySqlSourceTestBase { // optional option options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false"); + options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "true"); Factory.Context context = new MockContext(Configuration.fromMap(options)); MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); - assertThat(factory.optionalOptions().contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED)) - .isEqualTo(true); + assertThat(factory.optionalOptions()) + .contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED, PARSE_ONLINE_SCHEMA_CHANGES); MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); - assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isEqualTo(false); + assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse(); + assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue(); } @Test diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 3c5d6d63b..078960266 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -357,7 +357,9 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat && Objects.equals(jdbcProperties, that.jdbcProperties) && Objects.equals(heartbeatInterval, that.heartbeatInterval) && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) - && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill); + && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill) + && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges + && useLegacyJsonFormat == that.useLegacyJsonFormat; } @Override @@ -390,7 +392,9 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat jdbcProperties, heartbeatInterval, chunkKeyColumn, - skipSnapshotBackFill); + skipSnapshotBackFill, + parseOnlineSchemaChanges, + useLegacyJsonFormat); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 6890ca78f..2c29bf230 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -805,6 +805,47 @@ public class MySqlTableSourceFactoryTest { } } + @Test + public void testEnablingExperimentalOptions() { + Map<String, String> properties = getAllOptions(); + properties.put("scan.parse.online.schema.changes.enabled", "true"); + properties.put("use.legacy.json.format", "true"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + MySqlTableSource expectedSource = + new MySqlTableSource( + SCHEMA, + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.systemDefault(), + PROPERTIES, + null, + false, + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(), + CHUNK_META_GROUP_SIZE.defaultValue(), + SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(), + CONNECT_TIMEOUT.defaultValue(), + CONNECT_MAX_RETRIES.defaultValue(), + CONNECTION_POOL_SIZE.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + StartupOptions.initial(), + false, + false, + new Properties(), + HEARTBEAT_INTERVAL.defaultValue(), + null, + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + true, + true); + assertEquals(expectedSource, actualSource); + } + private Map<String, String> getAllOptions() { Map<String, String> options = new HashMap<>(); options.put("connector", "mysql-cdc");