Hi devs,
I'd like to open discussion on `Async MongoDB Sink with Error
Classification and Shared Connection Pool`, a proposal to modernize the
sink path of flink-connector-mongodb by bringing it in line with the
AsyncSink standard, adding a structured error-handling model, and
introducing a TM-shared MongoClient pool.
Motivation
While reviewing flink-connector-mongodb at HEAD, three recurring issues
showed up in production workloads:
1.
MongoSink still extends SinkV2 directly, so it does not benefit
from AsyncSinkBase's in-flight rate limiting, automatic batching,
or the standard connector metrics that Kinesis / DynamoDB /
Firehose already expose. prepareCommit also blocks on flush, so
barrier alignment latency is amplified by MongoDB write latency.
2.
MongoWriter.doBulkWrite catches the top-level MongoException and
retries a fixed number of times with LINEAR backoff. That causes
(a) permanent errors such as E11000 / E10334 / invalid-doc to be
retried uselessly, (b) thundering-herd patterns after transient
overload, and (c) no dead-letter path for unrecoverable records.
This overlaps with the recent thread "Optional configuration
parameters for better performance" and with FLINK-39398 (PR #66
by Savonitar, which only covers duplicate keys).
3.
Every MongoWriter / MongoSourceReader / MongoLookupFunction
instance opens its own MongoClient. A job with sink=4 + lookup=4
- source=4 against the same cluster opens 3 x 4 x poolSize
connections per TM, plus the corresponding SDAM and DNS monitor
threads.
Proposal
Three coordinated changes, shipped as sub-tasks under a single FLIP:
1.
MongoAsyncSink on top of AsyncSinkBase (FLIP-171). Public API:
MongoAsyncSink.builder()
.setConnectionOptions(...)
.setSerializationSchema(...)
.setErrorClassifier(...) // see (2)
.setDeadLetterSink(...) // see (2)
.setClientProvider(...) // see (3)
// AsyncSinkBase options inherited as-is:
// maxBatchSize / maxInFlightRequests / maxBufferedRequests
// maxBatchSizeInBytes / maxTimeInBufferMS
.build();
The existing MongoSink / MongoSinkBuilder are kept for one minor
release as @Deprecated delegates to MongoAsyncSink.
2.
A pluggable MongoErrorClassifier mapping MongoDB server error
codes to one of:
RETRY_WITH_BACKOFF | DEAD_LETTER | IGNORE | UPSERT | FAIL_JOB
Default mapping (configurable):
6,7,89,91,189,13435,13436 -> RETRY_WITH_BACKOFF (transient)
50 -> RETRY_WITH_BACKOFF (timeout)
11000,11001 (dup key) -> DEAD_LETTER | IGNORE | UPSERT
10334,2,14,121 (invalid doc) -> DEAD_LETTER
others -> FAIL_JOB
Retries use exponential backoff with jitter (via AsyncSink's
ResultHandler.retryForEntries). DEAD_LETTER is routed to a
user-supplied Sink with full metadata.
3.
A TM-shared MongoClientProvider (ref-counted, keyed by canonical
URI + credentials digest + TLS + compressors + appName). Sink,
Source and Lookup all go through:
MongoClient c = provider.acquire(options);
...
provider.release(c); // underlying close() when refCount == 0
Default provider: sharedPerTaskManager(). Legacy behavior is kept
via perInstance() and a sink.client-provider=PER_INSTANCE option.
Backward compatibility
- MongoSink / MongoSinkBuilder: @Deprecated, internally delegating.
- Legacy ConfigOptions (sink.bulk-flush.max-actions,
sink.bulk-flush.interval) continue to work; mapped to
maxBatchSize / maxTimeInBufferMS at runtime with a WARN log.
- Table DDL parameters: legacy keys kept for one minor version.
- State schema is not binary compatible; a MongoStateMigrator tool
is provided to replay unacked entries from an old savepoint into
the new sink.
Deprecation timeline:
flink-connector-mongodb 2.0 : both exist, legacy is default
flink-connector-mongodb 2.1 : MongoAsyncSink becomes default
flink-connector-mongodb 3.0 : legacy MongoSink removed
Test plan
- Unit tests: DefaultMongoErrorClassifierTest (all branches),
SharedMongoClientProviderTest (concurrency / ref-count),
MongoSinkWriterTest (standard AsyncSink behaviors).
- Integration tests with testcontainers MongoDB 6.0 / 7.0 covering
network partition, primary step-down, oversized doc, duplicate
key, and savepoint migration from the legacy sink.
- JMH benchmark: 1 KB docs, parallelism 100, 5 min. Target:
throughput >= 1.2x legacy, p99 <= legacy, TM heap -30% (from the
shared client).
- CI matrix expanded to Flink 1.20 LTS / 2.0 / 2.1.
Note on FLINK-36228
An earlier attempt (FLINK-36228) was resolved as Fixed but later
withdrawn by the reporter -- no AsyncSink migration actually landed.
This FLIP is not a duplicate and is scoped wider (error classifier
and shared client pool). I'm happy to link the reasoning from that
ticket in a dedicated "history" section of the cwiki page if that
helps reviewers.
Best,
featzhang