zeroshade commented on code in PR #912:
URL: https://github.com/apache/iceberg-go/pull/912#discussion_r3101833588
##########
table/table.go:
##########
@@ -37,6 +40,26 @@ import (
"golang.org/x/sync/errgroup"
)
+// ErrCommitFailed is the sentinel error returned by catalogs when a
+// commit fails due to a concurrent modification (e.g. HTTP 409 Conflict
+// from the REST catalog). Catalog implementations should wrap this
+// error so that callers using errors.Is(err, table.ErrCommitFailed)
+// can detect retryable commit conflicts.
+//
+// Currently only catalog/rest wraps this sentinel; Glue, SQL, and Hive
+// catalogs return their conflict errors raw and will not trigger
+// retries until follow-up work wires them through (tracked under
+// issue #830).
+//
+// The retry loop in doCommit re-issues the original updates and
+// requirements unchanged. This recovers only from transient catalog
+// errors (dropped connections, brief 409 during leader election); it
+// does not yet refresh the table metadata between attempts, so a
+// contended commit whose AssertRefSnapshotID requirement has been
+// invalidated by a peer will fail deterministically on every retry.
+// Refresh-and-replay is tracked separately (issue #830).
+var ErrCommitFailed = errors.New("commit failed, refresh and try again")
Review Comment:
Is there a particular reason to have both this and the
`rest.ErrCommitFailed`?
##########
table/table.go:
##########
@@ -303,10 +326,55 @@ func (t Table) AllManifests(ctx context.Context)
iter.Seq2[iceberg.ManifestFile,
}
func (t Table) doCommit(ctx context.Context, updates []Update, reqs
[]Requirement) (*Table, error) {
- newMeta, newLoc, err := t.cat.CommitTable(ctx, t.identifier, reqs,
updates)
+ cfg := readRetryConfig(t.metadata.Properties())
+
+ // Bound total retry time with a derived context so both the wait loop
+ // and the CommitTable call itself respect the deadline uniformly.
+ retryCtx, cancel := context.WithTimeout(ctx,
time.Duration(cfg.totalTimeoutMs)*time.Millisecond)
+ defer cancel()
+
+ var (
+ newMeta Metadata
+ newLoc string
+ err error
+ )
+
+ for attempt := 0; attempt <= cfg.numRetries; attempt++ {
+ if retryCtx.Err() != nil {
+ return nil, context.Cause(retryCtx)
+ }
+
+ newMeta, newLoc, err = t.cat.CommitTable(retryCtx,
t.identifier, reqs, updates)
+ if err == nil {
+ break
+ }
+
+ // Only retry on retryable commit conflicts. Unknown-state
errors
+ // (5xx, gateway timeouts) must NOT be retried because the
commit
+ // may have actually succeeded — retrying could duplicate work.
+ if !errors.Is(err, ErrCommitFailed) {
+ return nil, err
+ }
+
+ if attempt == cfg.numRetries {
+ break
+ }
+
+ wait := backoffDuration(attempt, cfg.minWaitMs, cfg.maxWaitMs)
+ timer := time.NewTimer(wait)
+ select {
+ case <-retryCtx.Done():
+ timer.Stop()
+
+ return nil, context.Cause(retryCtx)
+ case <-timer.C:
+ }
Review Comment:
Rather than creating a NewTimer on every iteration, maybe create a single
timer and just use `.Reset` on it?
Also, just for ease of reading, maybe put this at the top of the loop and
have `if attempt != 0 {` to skip it rather than having `if attempt ==
cfg.numRetries { break }`. Functionally it's not any different, but I think the
logic is easier to follow.
##########
table/table.go:
##########
@@ -303,10 +326,55 @@ func (t Table) AllManifests(ctx context.Context)
iter.Seq2[iceberg.ManifestFile,
}
func (t Table) doCommit(ctx context.Context, updates []Update, reqs
[]Requirement) (*Table, error) {
- newMeta, newLoc, err := t.cat.CommitTable(ctx, t.identifier, reqs,
updates)
+ cfg := readRetryConfig(t.metadata.Properties())
+
+ // Bound total retry time with a derived context so both the wait loop
+ // and the CommitTable call itself respect the deadline uniformly.
+ retryCtx, cancel := context.WithTimeout(ctx,
time.Duration(cfg.totalTimeoutMs)*time.Millisecond)
+ defer cancel()
+
+ var (
+ newMeta Metadata
+ newLoc string
+ err error
+ )
+
+ for attempt := 0; attempt <= cfg.numRetries; attempt++ {
Review Comment:
```suggestion
for attempt := range cfg.numRetries {
```
##########
table/table.go:
##########
@@ -303,10 +326,55 @@ func (t Table) AllManifests(ctx context.Context)
iter.Seq2[iceberg.ManifestFile,
}
func (t Table) doCommit(ctx context.Context, updates []Update, reqs
[]Requirement) (*Table, error) {
- newMeta, newLoc, err := t.cat.CommitTable(ctx, t.identifier, reqs,
updates)
+ cfg := readRetryConfig(t.metadata.Properties())
+
+ // Bound total retry time with a derived context so both the wait loop
+ // and the CommitTable call itself respect the deadline uniformly.
+ retryCtx, cancel := context.WithTimeout(ctx,
time.Duration(cfg.totalTimeoutMs)*time.Millisecond)
+ defer cancel()
+
+ var (
+ newMeta Metadata
+ newLoc string
+ err error
+ )
+
+ for attempt := 0; attempt <= cfg.numRetries; attempt++ {
+ if retryCtx.Err() != nil {
+ return nil, context.Cause(retryCtx)
+ }
+
+ newMeta, newLoc, err = t.cat.CommitTable(retryCtx,
t.identifier, reqs, updates)
+ if err == nil {
+ break
+ }
+
+ // Only retry on retryable commit conflicts. Unknown-state
errors
+ // (5xx, gateway timeouts) must NOT be retried because the
commit
+ // may have actually succeeded — retrying could duplicate work.
+ if !errors.Is(err, ErrCommitFailed) {
Review Comment:
None of the current implementations are able to return this yet, is that
intentional?
##########
table/table.go:
##########
@@ -316,6 +384,68 @@ func (t Table) doCommit(ctx context.Context, updates
[]Update, reqs []Requiremen
return New(t.identifier, newMeta, newLoc, t.fsF, t.cat), nil
}
+type retryConfig struct {
+ numRetries int
+ minWaitMs int
+ maxWaitMs int
+ totalTimeoutMs int
+}
+
+func readRetryConfig(props iceberg.Properties) retryConfig {
+ cfg := retryConfig{
+ numRetries: props.GetInt(CommitNumRetriesKey,
CommitNumRetriesDefault),
+ minWaitMs: props.GetInt(CommitMinRetryWaitMsKey,
CommitMinRetryWaitMsDefault),
+ maxWaitMs: props.GetInt(CommitMaxRetryWaitMsKey,
CommitMaxRetryWaitMsDefault),
+ totalTimeoutMs: props.GetInt(CommitTotalRetryTimeoutMsKey,
CommitTotalRetryTimeoutMsDefault),
+ }
+ if cfg.numRetries < 0 {
+ cfg.numRetries = 0
+ }
+
+ return cfg
+}
+
+// backoffDuration computes wait time for the given 0-based retry attempt
+// using exponential backoff (minMs << attempt) clamped to maxMs, with
+// jitter in [minMs, ceiling] to avoid retry stampedes while keeping a
+// non-zero floor between attempts. Java Iceberg uses a deterministic
+// exponential backoff here; we add jitter to reduce stampede risk on
+// concurrent Go writers. Backoff is client-local, so this does not
+// affect cross-client interop.
+func backoffDuration(attempt, minMs, maxMs int) time.Duration {
Review Comment:
another case where why not use `uint` instead to avoid having to include the
negative checks?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]