leonardBang commented on code in PR #3812:
URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r2048647527


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/DataSourceFactory.java:
##########
@@ -28,4 +28,7 @@ public interface DataSourceFactory extends Factory {
 
     /** Creates a {@link DataSource} instance. */
     DataSource createDataSource(Context context);
+
+    /** Checking if this {@link DataSource} could be created in batch mode. */
+    default void verifyRuntimeMode(Context context) {}

Review Comment:
   This is not intuitive to me, why do we need to introduce a method to verify 
runtime mode(especially the java doc said batch mode) in Factory?   



##########
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java:
##########
@@ -366,6 +366,7 @@ private void testSchemaEvolutionTypesParsing(
                                     .put("parallelism", "4")
                                     .put("schema.change.behavior", "evolve")
                                     .put("schema-operator.rpc-timeout", "1 h")
+                                    .put("runtime-mode", "STREAMING")

Review Comment:
   The change is unnecessary if the configuration has a default value IIUC?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java:
##########
@@ -274,6 +276,22 @@ public DataSource createDataSource(Context context) {
         return new MySqlDataSource(configFactory, readableMetadataList);
     }
 
+    @Override
+    public void verifyRuntimeMode(Context context) {
+        final Configuration config = context.getFactoryConfiguration();
+        StartupOptions startupOptions = getStartupOptions(config);
+        // Batch mode only supports StartupMode.SNAPSHOT.
+        Configuration pipelineConfiguration = 
context.getPipelineConfiguration();
+        if (pipelineConfiguration != null
+                && 
pipelineConfiguration.contains(PipelineOptions.PIPELINE_RUNTIME_MODE)
+                && RuntimeMode.BATCH.equals(
+                        
pipelineConfiguration.get(PipelineOptions.PIPELINE_RUNTIME_MODE))
+                && !StartupOptions.snapshot().equals(startupOptions)) {
+            throw new IllegalArgumentException(
+                    "Batch mode is only supported for MySQL source in snapshot 
mode.");

Review Comment:
   ```suggestion
                       "Only "snapshot" of MySQLDataSource StartupOption is 
supported in BATCH pipeline, but actual MySQLDataSource StartupOption is {}.");
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java:
##########
@@ -36,17 +36,31 @@ public class DataSinkWriterOperatorFactory<CommT>
                 YieldingOperatorFactory<CommittableMessage<CommT>> {
 
     private final Sink<Event> sink;
+    private final boolean isBatchMode;
     private final OperatorID schemaOperatorID;
 
-    public DataSinkWriterOperatorFactory(Sink<Event> sink, OperatorID 
schemaOperatorID) {
+    public DataSinkWriterOperatorFactory(
+            Sink<Event> sink, boolean isBatchMode, OperatorID 
schemaOperatorID) {
         this.sink = sink;
+        this.isBatchMode = isBatchMode;
         this.schemaOperatorID = schemaOperatorID;
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <T extends StreamOperator<CommittableMessage<CommT>>> T 
createStreamOperator(
             StreamOperatorParameters<CommittableMessage<CommT>> parameters) {
+
+        if (isBatchMode) {

Review Comment:
   just a personal flavor,I think use `isBounded` in code is more aligned with 
flink's stream concept(unbounded stream and bounded stream )



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -45,6 +45,12 @@ public class PipelineOptions {
                     .defaultValue(1)
                     .withDescription("Parallelism of the pipeline");
 
+    public static final ConfigOption<RuntimeMode> PIPELINE_RUNTIME_MODE =

Review Comment:
   Flink has `execution.runtime-mode` configuration, and I assume the two 
configuration has same semantics, could we follow Flink's design[1] ?
   [1] 
https://github.com/apache/flink/blob/7c03bc834b642ff40828739961a9a4100b9fb073/flink-core-api/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java#L39C5-L39C14
 



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -90,7 +94,15 @@ public MySqlPipelineRecordEmitter(
     protected void processElement(
             SourceRecord element, SourceOutput<Event> output, MySqlSplitState 
splitState)
             throws Exception {
-        if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) 
{
+        if (shouldEmitAllCtesInSnapshotMode
+                && 
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions())) {

Review Comment:
   We can calculate `Boolean isBounded = 
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions()` in 
constructor instead of calculate it per element.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -71,6 +72,9 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
     // Used when startup mode is initial
     private Set<TableId> alreadySendCreateTableTables;
 
+    // Used when startup mode is snapshot
+    private boolean shouldEmitAllCtesInSnapshotMode = true;

Review Comment:
   `Ctes` means? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to