https://docs.google.com/document/d/1p2K1tYiLz7nvaW4akTGdHDPzG_fMjXzhtzVKpSfcNC0/edit?usp=sharing

On Mon, May 11, 2026 at 1:30 PM FeatZhang <[email protected]> wrote:

> Hi devs,
>
> I'd like to open discussion on `Async MongoDB Sink with Error
> Classification and Shared Connection Pool`, a proposal to modernize the
> sink path of flink-connector-mongodb by bringing it in line with the
> AsyncSink standard, adding a structured error-handling model, and
> introducing a TM-shared MongoClient pool.
> Motivation
>
> While reviewing flink-connector-mongodb at HEAD, three recurring issues
> showed up in production workloads:
>
>    1.
>
>    MongoSink still extends SinkV2 directly, so it does not benefit
>    from AsyncSinkBase's in-flight rate limiting, automatic batching,
>    or the standard connector metrics that Kinesis / DynamoDB /
>    Firehose already expose. prepareCommit also blocks on flush, so
>    barrier alignment latency is amplified by MongoDB write latency.
>    2.
>
>    MongoWriter.doBulkWrite catches the top-level MongoException and
>    retries a fixed number of times with LINEAR backoff. That causes
>    (a) permanent errors such as E11000 / E10334 / invalid-doc to be
>    retried uselessly, (b) thundering-herd patterns after transient
>    overload, and (c) no dead-letter path for unrecoverable records.
>    This overlaps with the recent thread "Optional configuration
>    parameters for better performance" and with FLINK-39398 (PR #66
>    by Savonitar, which only covers duplicate keys).
>    3.
>
>    Every MongoWriter / MongoSourceReader / MongoLookupFunction
>    instance opens its own MongoClient. A job with sink=4 + lookup=4
>    - source=4 against the same cluster opens 3 x 4 x poolSize
>       connections per TM, plus the corresponding SDAM and DNS monitor
>       threads.
>
> Proposal
>
> Three coordinated changes, shipped as sub-tasks under a single FLIP:
>
>    1.
>
>    MongoAsyncSink on top of AsyncSinkBase (FLIP-171). Public API:
>
>    MongoAsyncSink.builder()
>    .setConnectionOptions(...)
>    .setSerializationSchema(...)
>    .setErrorClassifier(...) // see (2)
>    .setDeadLetterSink(...) // see (2)
>    .setClientProvider(...) // see (3)
>    // AsyncSinkBase options inherited as-is:
>    // maxBatchSize / maxInFlightRequests / maxBufferedRequests
>    // maxBatchSizeInBytes / maxTimeInBufferMS
>    .build();
>
>    The existing MongoSink / MongoSinkBuilder are kept for one minor
>    release as @Deprecated delegates to MongoAsyncSink.
>    2.
>
>    A pluggable MongoErrorClassifier mapping MongoDB server error
>    codes to one of:
>
>    RETRY_WITH_BACKOFF | DEAD_LETTER | IGNORE | UPSERT | FAIL_JOB
>
>    Default mapping (configurable):
>
>    6,7,89,91,189,13435,13436 -> RETRY_WITH_BACKOFF (transient)
>    50 -> RETRY_WITH_BACKOFF (timeout)
>    11000,11001 (dup key) -> DEAD_LETTER | IGNORE | UPSERT
>    10334,2,14,121 (invalid doc) -> DEAD_LETTER
>    others -> FAIL_JOB
>
>    Retries use exponential backoff with jitter (via AsyncSink's
>    ResultHandler.retryForEntries). DEAD_LETTER is routed to a
>    user-supplied Sink with full metadata.
>    3.
>
>    A TM-shared MongoClientProvider (ref-counted, keyed by canonical
>    URI + credentials digest + TLS + compressors + appName). Sink,
>    Source and Lookup all go through:
>
>    MongoClient c = provider.acquire(options);
>    ...
>    provider.release(c); // underlying close() when refCount == 0
>
>    Default provider: sharedPerTaskManager(). Legacy behavior is kept
>    via perInstance() and a sink.client-provider=PER_INSTANCE option.
>
> Backward compatibility
>
>    - MongoSink / MongoSinkBuilder: @Deprecated, internally delegating.
>    - Legacy ConfigOptions (sink.bulk-flush.max-actions,
>    sink.bulk-flush.interval) continue to work; mapped to
>    maxBatchSize / maxTimeInBufferMS at runtime with a WARN log.
>    - Table DDL parameters: legacy keys kept for one minor version.
>    - State schema is not binary compatible; a MongoStateMigrator tool
>    is provided to replay unacked entries from an old savepoint into
>    the new sink.
>
> Deprecation timeline:
> flink-connector-mongodb 2.0 : both exist, legacy is default
> flink-connector-mongodb 2.1 : MongoAsyncSink becomes default
> flink-connector-mongodb 3.0 : legacy MongoSink removed
> Test plan
>
>    - Unit tests: DefaultMongoErrorClassifierTest (all branches),
>    SharedMongoClientProviderTest (concurrency / ref-count),
>    MongoSinkWriterTest (standard AsyncSink behaviors).
>    - Integration tests with testcontainers MongoDB 6.0 / 7.0 covering
>    network partition, primary step-down, oversized doc, duplicate
>    key, and savepoint migration from the legacy sink.
>    - JMH benchmark: 1 KB docs, parallelism 100, 5 min. Target:
>    throughput >= 1.2x legacy, p99 <= legacy, TM heap -30% (from the
>    shared client).
>    - CI matrix expanded to Flink 1.20 LTS / 2.0 / 2.1.
>
> Note on FLINK-36228
>
> An earlier attempt (FLINK-36228) was resolved as Fixed but later
> withdrawn by the reporter -- no AsyncSink migration actually landed.
> This FLIP is not a duplicate and is scoped wider (error classifier
> and shared client pool). I'm happy to link the reasoning from that
> ticket in a dedicated "history" section of the cwiki page if that
> helps reviewers.
>
>
> Best,
> featzhang
>

Reply via email to