LuciferYang commented on PR #364: URL: https://github.com/apache/doris-spark-connector/pull/364#issuecomment-4786123145
Thanks for taking a careful look — good questions. Replies inline: **1. Without 2PC it can't really be exactly-once — agreed.** This PR doesn't try to make auto-commit loads exactly-once end to end, and as you say it can't: one task can fire several stream loads, and if Spark re-runs the whole task attempt those already-committed batches get replayed under fresh labels (a new attempt means a new processor with an empty record buffer). What it actually fixes is much narrower — the connector's own in-task retry loop (`DORIS_SINK_MAX_RETRIES` / `Retry.exec`). When a single batch's HTTP request fails on the client side but the batch had in fact committed on the BE, the retry used to resend the same data under a *new* label and write the rows twice. Reusing the label lets the BE reject the retry as a duplicate. For exactly-once across executor/task failover you still need 2PC. **2. Only matters when `retries > 0`, and relies on the retry sending identical data — exactly.** `DorisDataWriter` only buffers rows (`recordBuffer`) when `retries > 0`, and on retry it replays exactly those rows in order, so the reused label plus identical payload is a genuine duplicate. With `retries = 0` there's no in-task retry and the manager always mints a fresh label, so nothing changes there. **3. On needing an abort before reuse (the PREPARE case in flink #523).** That one is about reusing a 2PC / precommitted transaction's label, where you do have to abort the stale PREPARE first or the same-label load is rejected forever. Here I deliberately don't reuse labels under 2PC at all — `onBatchFailed()` is a no-op when 2PC is on, since 2PC already gets exactly-once from precommit/commit (the retry opens a fresh txn and the orphaned precommit aborts). So there's no PREPARE state hanging around to clear. For auto-commit there's no long-lived transaction, and at the point the label collides there's no txn id to abort anyway. The three end states are handled directly: - original **committed** → BE returns `Label Already Exists` with `ExistingJobStatus=FINISHED` → we treat the retry as a no-op (rows are already there); - original **aborted/cancelled** → the label is free again → the retry's fresh load just succeeds; - original **still RUNNING/PRECOMMITTED** → we don't mask it; it stays a retriable failure (masking could silently drop rows) and the retry interval gives the BE time to settle. So abort-before-reuse doesn't really apply here, and would actually be unsafe — in the FINISHED case the only thing to abort would be the load we want to keep. Happy to drop in a comment/test making this explicit if you think it's worth it. **4. Where we actually hit the duplicate.** It came up while getting the Spark 4.1 failover IT to pass. Under Spark 4.x the retry backoff was collapsing — the async stream-load worker interrupts the task thread on failure, and the interrupt also unparks the old `LockSupport.parkNanos`, so the interval shrank to a few ms (fixed separately in the retry-interval commit). With the interval gone, a batch that had already committed on the BE but whose client ack was lost to the interrupt got retried right away under a new label, and the rows landed twice. Reusing the label is what makes the BE dedup that retry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
