Flink CDC Issue Import created FLINK-34874:
----------------------------------------------

             Summary: [MongoDB] Support initial.snapshotting.pipeline related 
configs in table api
                 Key: FLINK-34874
                 URL: https://issues.apache.org/jira/browse/FLINK-34874
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
nothing similar.


### Motivation

MongoDB's startup.mode.copy.existing.pipeline(akka 
initial.snapshotting.pipeline in mongo-cdc] is an array of JSON objects 
describing the pipeline operations to run when copying existing data, see 
[link|https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/startup/#std-label-source-configuration-startup].
 This can improve the use of indexes by the copying manager and make copying 
more efficient, which is very important in some user scenarios. Besides, there 
are also some related configs, like startup.mode.copy.existing.queue.size, 
startup.mode.copy.existing.max.threads.
Currently we only support these configs in datastream api, for the convenience 
of users, we should also support them in table api.

### Solution

Support initial.snapshotting.pipeline related configs in table api

### Alternatives

_No response_

### Anything else?

Note that in 2.3.0, we remove these configs from table api when support 
incremental snapshot mode for MongoDB in this 
[commit|https://github.com/ververica/flink-cdc-connectors/commit/301e5a8ab08f7b6c4414c0a81561b9a1bf7fab19],
 since in incremental snapshot mode, the semantic is inconsistent when uses the 
pipeline operations. The reason is that in snapshot phase of incremental 
snapshot mode, the oplog will be played back after each snapshot to compensate 
for changes, but the pipeline operations in copy.existing.pipeline are not 
applied to the playback oplog, which means the semantic of this config is 
inconsistent.
But in legacy debezium mode, the behaviour is correct, so we add these configs 
back in debezium mode for better forward compatibility. And notify user not to 
use them in incremental snapshot mode due to above reason.

### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/3069
Created by: [herunkang2018|https://github.com/herunkang2018]
Labels: enhancement, 
Assignee: [herunkang2018|https://github.com/herunkang2018]
Created at: Tue Feb 20 10:28:11 CST 2024
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to