leonardBang commented on code in PR #3812:
URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r2048647527
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/DataSourceFactory.java:
##########
@@ -28,4 +28,7 @@ public interface DataSourceFactory extends Factory {
/** Creates a {@link DataSource} instance. */
DataSource createDataSource(Context context);
+
+ /** Checking if this {@link DataSource} could be created in batch mode. */
+ default void verifyRuntimeMode(Context context) {}
Review Comment:
This is not intuitive to me, why do we need to introduce a method to verify
runtime mode(especially the java doc said batch mode) in Factory?
##########
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java:
##########
@@ -366,6 +366,7 @@ private void testSchemaEvolutionTypesParsing(
.put("parallelism", "4")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
+ .put("runtime-mode", "STREAMING")
Review Comment:
The change is unnecessary if the configuration has a default value IIUC?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java:
##########
@@ -274,6 +276,22 @@ public DataSource createDataSource(Context context) {
return new MySqlDataSource(configFactory, readableMetadataList);
}
+ @Override
+ public void verifyRuntimeMode(Context context) {
+ final Configuration config = context.getFactoryConfiguration();
+ StartupOptions startupOptions = getStartupOptions(config);
+ // Batch mode only supports StartupMode.SNAPSHOT.
+ Configuration pipelineConfiguration =
context.getPipelineConfiguration();
+ if (pipelineConfiguration != null
+ &&
pipelineConfiguration.contains(PipelineOptions.PIPELINE_RUNTIME_MODE)
+ && RuntimeMode.BATCH.equals(
+
pipelineConfiguration.get(PipelineOptions.PIPELINE_RUNTIME_MODE))
+ && !StartupOptions.snapshot().equals(startupOptions)) {
+ throw new IllegalArgumentException(
+ "Batch mode is only supported for MySQL source in snapshot
mode.");
Review Comment:
```suggestion
"Only "snapshot" of MySQLDataSource StartupOption is
supported in BATCH pipeline, but actual MySQLDataSource StartupOption is {}.");
```
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java:
##########
@@ -36,17 +36,31 @@ public class DataSinkWriterOperatorFactory<CommT>
YieldingOperatorFactory<CommittableMessage<CommT>> {
private final Sink<Event> sink;
+ private final boolean isBatchMode;
private final OperatorID schemaOperatorID;
- public DataSinkWriterOperatorFactory(Sink<Event> sink, OperatorID
schemaOperatorID) {
+ public DataSinkWriterOperatorFactory(
+ Sink<Event> sink, boolean isBatchMode, OperatorID
schemaOperatorID) {
this.sink = sink;
+ this.isBatchMode = isBatchMode;
this.schemaOperatorID = schemaOperatorID;
}
@SuppressWarnings("unchecked")
@Override
public <T extends StreamOperator<CommittableMessage<CommT>>> T
createStreamOperator(
StreamOperatorParameters<CommittableMessage<CommT>> parameters) {
+
+ if (isBatchMode) {
Review Comment:
just a personal flavor,I think use `isBounded` in code is more aligned with
flink's stream concept(unbounded stream and bounded stream )
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -45,6 +45,12 @@ public class PipelineOptions {
.defaultValue(1)
.withDescription("Parallelism of the pipeline");
+ public static final ConfigOption<RuntimeMode> PIPELINE_RUNTIME_MODE =
Review Comment:
Flink has `execution.runtime-mode` configuration, and I assume the two
configuration has same semantics, could we follow Flink's design[1] ?
[1]
https://github.com/apache/flink/blob/7c03bc834b642ff40828739961a9a4100b9fb073/flink-core-api/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java#L39C5-L39C14
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -90,7 +94,15 @@ public MySqlPipelineRecordEmitter(
protected void processElement(
SourceRecord element, SourceOutput<Event> output, MySqlSplitState
splitState)
throws Exception {
- if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState())
{
+ if (shouldEmitAllCtesInSnapshotMode
+ &&
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions())) {
Review Comment:
We can calculate `Boolean isBounded =
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions()` in
constructor instead of calculate it per element.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -71,6 +72,9 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
// Used when startup mode is initial
private Set<TableId> alreadySendCreateTableTables;
+ // Used when startup mode is snapshot
+ private boolean shouldEmitAllCtesInSnapshotMode = true;
Review Comment:
`Ctes` means?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]