This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a4c25643 [FLINK-37262][pipeline-connector/mysql] Fix missing 
PARSE_ONLINE_SCHEMA_CHANGES option in MySqlDataSourceFactory
0a4c25643 is described below

commit 0a4c256433c60b8a43ccc412d38f35904d0b5448
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Tue Feb 11 20:19:09 2025 +0800

    [FLINK-37262][pipeline-connector/mysql] Fix missing 
PARSE_ONLINE_SCHEMA_CHANGES option in MySqlDataSourceFactory
    
    This closes #3910
---
 .../mysql/factory/MySqlDataSourceFactory.java      |  1 +
 .../mysql/source/MySqlDataSourceFactoryTest.java   |  9 +++--
 .../connectors/mysql/table/MySqlTableSource.java   |  8 +++--
 .../mysql/table/MySqlTableSourceFactoryTest.java   | 41 ++++++++++++++++++++++
 4 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 9923226d1..359baaed1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -336,6 +336,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         options.add(INCLUDE_COMMENTS_ENABLED);
         options.add(USE_LEGACY_JSON_FORMAT);
         options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
+        options.add(PARSE_ONLINE_SCHEMA_CHANGES);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index de49c7b7e..bc1a7c9ba 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
@@ -258,14 +259,16 @@ public class MySqlDataSourceFactoryTest extends 
MySqlSourceTestBase {
 
         // optional option
         options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
+        options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "true");
 
         Factory.Context context = new 
MockContext(Configuration.fromMap(options));
         MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
-        
assertThat(factory.optionalOptions().contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED))
-                .isEqualTo(true);
+        assertThat(factory.optionalOptions())
+                .contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED, 
PARSE_ONLINE_SCHEMA_CHANGES);
 
         MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
-        
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isEqualTo(false);
+        
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse();
+        
assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue();
     }
 
     @Test
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 3c5d6d63b..078960266 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -357,7 +357,9 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 && Objects.equals(jdbcProperties, that.jdbcProperties)
                 && Objects.equals(heartbeatInterval, that.heartbeatInterval)
                 && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
-                && Objects.equals(skipSnapshotBackFill, 
that.skipSnapshotBackFill);
+                && Objects.equals(skipSnapshotBackFill, 
that.skipSnapshotBackFill)
+                && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
+                && useLegacyJsonFormat == that.useLegacyJsonFormat;
     }
 
     @Override
@@ -390,7 +392,9 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 jdbcProperties,
                 heartbeatInterval,
                 chunkKeyColumn,
-                skipSnapshotBackFill);
+                skipSnapshotBackFill,
+                parseOnlineSchemaChanges,
+                useLegacyJsonFormat);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 6890ca78f..2c29bf230 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -805,6 +805,47 @@ public class MySqlTableSourceFactoryTest {
         }
     }
 
+    @Test
+    public void testEnablingExperimentalOptions() {
+        Map<String, String> properties = getAllOptions();
+        properties.put("scan.parse.online.schema.changes.enabled", "true");
+        properties.put("use.legacy.json.format", "true");
+
+        // validation for source
+        DynamicTableSource actualSource = createTableSource(properties);
+        MySqlTableSource expectedSource =
+                new MySqlTableSource(
+                        SCHEMA,
+                        3306,
+                        MY_LOCALHOST,
+                        MY_DATABASE,
+                        MY_TABLE,
+                        MY_USERNAME,
+                        MY_PASSWORD,
+                        ZoneId.systemDefault(),
+                        PROPERTIES,
+                        null,
+                        false,
+                        SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
+                        CHUNK_META_GROUP_SIZE.defaultValue(),
+                        SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
+                        CONNECT_TIMEOUT.defaultValue(),
+                        CONNECT_MAX_RETRIES.defaultValue(),
+                        CONNECTION_POOL_SIZE.defaultValue(),
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        StartupOptions.initial(),
+                        false,
+                        false,
+                        new Properties(),
+                        HEARTBEAT_INTERVAL.defaultValue(),
+                        null,
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        true,
+                        true);
+        assertEquals(expectedSource, actualSource);
+    }
+
     private Map<String, String> getAllOptions() {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "mysql-cdc");

Reply via email to