Hi Sachin,
The 'collectionList' needs to be filled with fully qualified names.
For example,
database: test_db
collection: test_collection
MongoDBSource.<T>builder()
.hosts(HOSTNAME)
.scheme(SCHEME)
.databaseList("test_db")
.collectionList("test_db.test_collection")
...
Best,
Jiabao
On 2024/08/17 12:16:39 Sachin Mittal wrote:
> Hi,
> I have configured a MongoDB CDC source as :
>
> MongoDBSource.<T>builder()
> .hosts(HOSTNAME)
> .scheme(SCHEME)
> .databaseList(MONGO_DB)
> .collectionList(collectionName)
> .username(USERNAME)
> .password(PASSWORD)
> .startupOptions(StartupOptions.initial())
> .batchSize(2048)
> .deserializer(
> new DebeziumDeserializationSchema<T>() {
>
> @Override
> public TypeInformation<T> getProducedType() {
> return Types.POJO(clazz);
> }
>
> @Override
> public void deserialize(SourceRecord record, Collector<T>
> collector) {
> logger.info("Reading source record {}", record);
> ...
> }
> })
> .build();
>
>
> In the flink's task manager logs I see following:
>
> 2024-08-17 17:30:29,134 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 discovers table schema for stream split stream-split
> success
> 2024-08-17 17:30:29,134 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 received the stream split : StreamSplit{splitId=
> 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> false}.
> 2024-08-17 17:30:29,153 INFO org.apache.flink.connector.base.source.reader.
> SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId=
> 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> false}]
> 2024-08-17 17:30:29,161 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Starting split fetcher 0
> 2024-08-17 17:30:29,182 INFO
> org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] -
> Preparing change stream for database <db> with namespace regex filter
> ^(<collection
> name>)$
> 2024-08-17 17:30:29,280 INFO
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
> MongoDBStreamFetchTask [] - Open the change stream at the timestamp:
> Timestamp{value=7404077049079398401, seconds=1723896025, inc=1}
>
>
> From the logs it seems that we are able to connect to the CDC stream and it
> should start by loading existing records in the collections as snapshot is
> set as initial.
>
> However I don't see any records being read or even any error in my Flink
> UI/logs.
>
> Any idea what may be going wrong.
>
> Thanks
> Sachin
>