[ 
https://issues.apache.org/jira/browse/FLINK-39954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18090667#comment-18090667
 ] 

Qiu Yanjun commented on FLINK-39954:
------------------------------------

Hi Flink maintainers, I opened a PR for this issue: 
https://github.com/apache/flink/pull/28507. It includes a focused regression 
test and fix. Could you please review when you have a chance? Thanks.


> SinkV2ITCase over-asserts commit multiplicity under unaligned checkpoints 
> --------------------------------------------------------------------------
>
>                 Key: FLINK-39954
>                 URL: https://issues.apache.org/jira/browse/FLINK-39954
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>            Reporter: Martijn Visser
>            Priority: Major
>              Labels: pull-request-available
>
> {{SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling}} asserts 
> an exact commit multiplicity 
> ({{containsExactlyInAnyOrder(duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE))}},
>  i.e. each value committed exactly 2x) using a non-idempotent test committer 
> ({{TrackingCommitter}} appends every {{commit()}} call). This is stronger 
> than the {{Committer}} contract guarantees and makes the test fragile under 
> unaligned checkpoints (UC), producing intermittent "unexpected element" 
> (duplicate) failures.
> This surfaced while working on FLINK-38614
> I believe it's over-specified because the {{Committer}} contract 
> ({{flink-core}} {{o.a.f.api.connector.sink2.Committer}} JavaDoc) guarantees 
> an exactly-once effect and explicitly permits repeated commits: "A commit 
> must be idempotent... Flink will restart from a previous checkpoint and 
> re-attempt to commit all committables. Thus, some or all committables may 
> have already been committed." It does not guarantee a specific number of 
> {{commit()}} invocations.
> * Under aligned checkpoints the commit/re-commit pattern is deterministic 
> (fixed barrier boundaries -> exactly the two-run replay), so 
> {{duplicate(EXPECTED)}} holds.
> * Under unaligned checkpoints a boundary committable can shift, so it is 
> committed in run 1, re-committed on run 2's restore, and committed again in 
> run 2's normal processing -> 3x. That extra is contract-permitted (a real 
> idempotent sink dedupes it to a no-op) but the non-idempotent counter records 
> it as an "unexpected element".
> It looks like we can't easily make TrackingCommitter idempotent, because:
> * The committable is {{Record}} = (value, timestamp, watermark); under this 
> test all records have {{timestamp=null, watermark=MIN}}, and the source emits 
> each value twice legitimately, so those committables are byte-identical. 
> There is no unique transaction/committable id to dedupe on ({{CommitRequest}} 
> exposes only {{getCommittable()}} plus retry signals). Deduping by content 
> would wrongly drop the legitimate second copy.
> * The test deliberately expects duplication ({{duplicate(EXPECTED)}}), so 
> making the committer idempotent also requires changing the expected result to 
> {{EXPECTED}}.
> A faithful fix gives the committable a unique identity (a 
> {{TestSinkV2}}/{{Record}} plus source change), makes {{TrackingCommitter}} 
> idempotent (dedupe by id, or use {{CommitRequest.signalAlreadyCommitted()}}), 
> and asserts the exactly-once effect ({{EXPECTED}}, not 
> {{duplicate(EXPECTED)}}). This is a non-trivial test redesign and should get 
> sink/checkpointing maintainer review since it changes what the canonical 
> SinkV2 test asserts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to