Hi devs,

I'd like to open discussion on `Async MongoDB Sink with Error
Classification and Shared Connection Pool`, a proposal to modernize the
sink path of flink-connector-mongodb by bringing it in line with the
AsyncSink standard, adding a structured error-handling model, and
introducing a TM-shared MongoClient pool.
Motivation

While reviewing flink-connector-mongodb at HEAD, three recurring issues
showed up in production workloads:

   1.

   MongoSink still extends SinkV2 directly, so it does not benefit
   from AsyncSinkBase's in-flight rate limiting, automatic batching,
   or the standard connector metrics that Kinesis / DynamoDB /
   Firehose already expose. prepareCommit also blocks on flush, so
   barrier alignment latency is amplified by MongoDB write latency.
   2.

   MongoWriter.doBulkWrite catches the top-level MongoException and
   retries a fixed number of times with LINEAR backoff. That causes
   (a) permanent errors such as E11000 / E10334 / invalid-doc to be
   retried uselessly, (b) thundering-herd patterns after transient
   overload, and (c) no dead-letter path for unrecoverable records.
   This overlaps with the recent thread "Optional configuration
   parameters for better performance" and with FLINK-39398 (PR #66
   by Savonitar, which only covers duplicate keys).
   3.

   Every MongoWriter / MongoSourceReader / MongoLookupFunction
   instance opens its own MongoClient. A job with sink=4 + lookup=4
   - source=4 against the same cluster opens 3 x 4 x poolSize
      connections per TM, plus the corresponding SDAM and DNS monitor
      threads.

Proposal

Three coordinated changes, shipped as sub-tasks under a single FLIP:

   1.

   MongoAsyncSink on top of AsyncSinkBase (FLIP-171). Public API:

   MongoAsyncSink.builder()
   .setConnectionOptions(...)
   .setSerializationSchema(...)
   .setErrorClassifier(...) // see (2)
   .setDeadLetterSink(...) // see (2)
   .setClientProvider(...) // see (3)
   // AsyncSinkBase options inherited as-is:
   // maxBatchSize / maxInFlightRequests / maxBufferedRequests
   // maxBatchSizeInBytes / maxTimeInBufferMS
   .build();

   The existing MongoSink / MongoSinkBuilder are kept for one minor
   release as @Deprecated delegates to MongoAsyncSink.
   2.

   A pluggable MongoErrorClassifier mapping MongoDB server error
   codes to one of:

   RETRY_WITH_BACKOFF | DEAD_LETTER | IGNORE | UPSERT | FAIL_JOB

   Default mapping (configurable):

   6,7,89,91,189,13435,13436 -> RETRY_WITH_BACKOFF (transient)
   50 -> RETRY_WITH_BACKOFF (timeout)
   11000,11001 (dup key) -> DEAD_LETTER | IGNORE | UPSERT
   10334,2,14,121 (invalid doc) -> DEAD_LETTER
   others -> FAIL_JOB

   Retries use exponential backoff with jitter (via AsyncSink's
   ResultHandler.retryForEntries). DEAD_LETTER is routed to a
   user-supplied Sink with full metadata.
   3.

   A TM-shared MongoClientProvider (ref-counted, keyed by canonical
   URI + credentials digest + TLS + compressors + appName). Sink,
   Source and Lookup all go through:

   MongoClient c = provider.acquire(options);
   ...
   provider.release(c); // underlying close() when refCount == 0

   Default provider: sharedPerTaskManager(). Legacy behavior is kept
   via perInstance() and a sink.client-provider=PER_INSTANCE option.

Backward compatibility

   - MongoSink / MongoSinkBuilder: @Deprecated, internally delegating.
   - Legacy ConfigOptions (sink.bulk-flush.max-actions,
   sink.bulk-flush.interval) continue to work; mapped to
   maxBatchSize / maxTimeInBufferMS at runtime with a WARN log.
   - Table DDL parameters: legacy keys kept for one minor version.
   - State schema is not binary compatible; a MongoStateMigrator tool
   is provided to replay unacked entries from an old savepoint into
   the new sink.

Deprecation timeline:
flink-connector-mongodb 2.0 : both exist, legacy is default
flink-connector-mongodb 2.1 : MongoAsyncSink becomes default
flink-connector-mongodb 3.0 : legacy MongoSink removed
Test plan

   - Unit tests: DefaultMongoErrorClassifierTest (all branches),
   SharedMongoClientProviderTest (concurrency / ref-count),
   MongoSinkWriterTest (standard AsyncSink behaviors).
   - Integration tests with testcontainers MongoDB 6.0 / 7.0 covering
   network partition, primary step-down, oversized doc, duplicate
   key, and savepoint migration from the legacy sink.
   - JMH benchmark: 1 KB docs, parallelism 100, 5 min. Target:
   throughput >= 1.2x legacy, p99 <= legacy, TM heap -30% (from the
   shared client).
   - CI matrix expanded to Flink 1.20 LTS / 2.0 / 2.1.

Note on FLINK-36228

An earlier attempt (FLINK-36228) was resolved as Fixed but later
withdrawn by the reporter -- no AsyncSink migration actually landed.
This FLIP is not a duplicate and is scoped wider (error classifier
and shared client pool). I'm happy to link the reasoning from that
ticket in a dedicated "history" section of the cwiki page if that
helps reviewers.


Best,
featzhang

Reply via email to