Xiqian Yu created FLINK-39795:
---------------------------------
Summary: MongoDB CDC does not use index when reading shard
collection chunks
Key: FLINK-39795
URL: https://issues.apache.org/jira/browse/FLINK-39795
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.6.0
Reporter: Xiqian Yu
Assignee: Xiqian Yu
In MongoUtils, we will fully poll `config.chunks` collection to obtain shard
collection chunks info, sorted by its $min value. An `$or` expression is added
for backwards compatibility with Mongo 4.8 and lower.
{code:java}
public static List<BsonDocument> readChunks(
MongoClient mongoClient, BsonDocument collectionMetadata) {
MongoCollection<BsonDocument> chunks =
collectionFor(mongoClient, TableId.parse("config.chunks"),
BsonDocument.class);
List<BsonDocument> collectionChunks = new ArrayList<>();
Bson filter =
or(
new BsonDocument(NAMESPACE_FIELD,
collectionMetadata.get(ID_FIELD)),
// MongoDB 4.9.0 removed ns field of config.chunks
collection, using
// collection's uuid instead.
// See: https://jira.mongodb.org/browse/SERVER-53105
new BsonDocument(UUID_FIELD,
collectionMetadata.get(UUID_FIELD)));
chunks.find(filter)
.projection(include("min", "max", "shard"))
.sort(ascending("min"))
.into(collectionChunks);
return collectionChunks;
}{code}
The problem is, with the extra filter, it is not possible to query the sorted
view of collection with index acceleration. Chunks will be sorted in memory,
causing the following exception:
{code:java}
Read config.chunks collection failed: Encountered non-retryable error during
query :: caused by :: Executor error during find command :: caused by :: Sort
operation used more than the maximum 33554432 bytes of RAM. Add an index, or
specify a smaller limit. {code}
We may check Mongo server version explicitly instead.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)