[
https://issues.apache.org/jira/browse/FLINK-39954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18090641#comment-18090641
]
Qiu Yanjun commented on FLINK-39954:
------------------------------------
Hi, I would like to work on this issue. Could you please assign it to me?
Thanks.
> SinkV2ITCase over-asserts commit multiplicity under unaligned checkpoints
> --------------------------------------------------------------------------
>
> Key: FLINK-39954
> URL: https://issues.apache.org/jira/browse/FLINK-39954
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Reporter: Martijn Visser
> Priority: Major
>
> {{SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling}} asserts
> an exact commit multiplicity
> ({{containsExactlyInAnyOrder(duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE))}},
> i.e. each value committed exactly 2x) using a non-idempotent test committer
> ({{TrackingCommitter}} appends every {{commit()}} call). This is stronger
> than the {{Committer}} contract guarantees and makes the test fragile under
> unaligned checkpoints (UC), producing intermittent "unexpected element"
> (duplicate) failures.
> This surfaced while working on FLINK-38614
> I believe it's over-specified because the {{Committer}} contract
> ({{flink-core}} {{o.a.f.api.connector.sink2.Committer}} JavaDoc) guarantees
> an exactly-once effect and explicitly permits repeated commits: "A commit
> must be idempotent... Flink will restart from a previous checkpoint and
> re-attempt to commit all committables. Thus, some or all committables may
> have already been committed." It does not guarantee a specific number of
> {{commit()}} invocations.
> * Under aligned checkpoints the commit/re-commit pattern is deterministic
> (fixed barrier boundaries -> exactly the two-run replay), so
> {{duplicate(EXPECTED)}} holds.
> * Under unaligned checkpoints a boundary committable can shift, so it is
> committed in run 1, re-committed on run 2's restore, and committed again in
> run 2's normal processing -> 3x. That extra is contract-permitted (a real
> idempotent sink dedupes it to a no-op) but the non-idempotent counter records
> it as an "unexpected element".
> It looks like we can't easily make TrackingCommitter idempotent, because:
> * The committable is {{Record}} = (value, timestamp, watermark); under this
> test all records have {{timestamp=null, watermark=MIN}}, and the source emits
> each value twice legitimately, so those committables are byte-identical.
> There is no unique transaction/committable id to dedupe on ({{CommitRequest}}
> exposes only {{getCommittable()}} plus retry signals). Deduping by content
> would wrongly drop the legitimate second copy.
> * The test deliberately expects duplication ({{duplicate(EXPECTED)}}), so
> making the committer idempotent also requires changing the expected result to
> {{EXPECTED}}.
> A faithful fix gives the committable a unique identity (a
> {{TestSinkV2}}/{{Record}} plus source change), makes {{TrackingCommitter}}
> idempotent (dedupe by id, or use {{CommitRequest.signalAlreadyCommitted()}}),
> and asserts the exactly-once effect ({{EXPECTED}}, not
> {{duplicate(EXPECTED)}}). This is a non-trivial test redesign and should get
> sink/checkpointing maintainer review since it changes what the canonical
> SinkV2 test asserts.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)