This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 131cc926 fix(table): deadlock in MapExec when workers error (#810)
131cc926 is described below
commit 131cc9269e549a2a53ed1561c6cbff42fe34138d
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Fri Mar 27 21:14:15 2026 +0100
fix(table): deadlock in MapExec when workers error (#810)
Use errgroup.WithContext so the first worker error cancels a derived
context. Add select on ctx.Done() for both the feeder (ch <- v) and
worker (out <- result) sends, allowing them to unblock when the context
is cancelled instead of blocking forever.
Closes #796
---
table/internal/utils.go | 26 +++++++++++++++++++-------
table/internal/utils_test.go | 24 +++++++++++++++++++++++-
table/snapshot_producers.go | 3 ++-
table/writer.go | 2 +-
4 files changed, 45 insertions(+), 10 deletions(-)
diff --git a/table/internal/utils.go b/table/internal/utils.go
index 95a1336d..adfbe2ed 100644
--- a/table/internal/utils.go
+++ b/table/internal/utils.go
@@ -20,6 +20,7 @@ package internal
import (
"bytes"
"container/heap"
+ "context"
"encoding/binary"
"errors"
"fmt"
@@ -520,12 +521,12 @@ func TruncateUpperBoundBinary(val []byte, trunc int)
[]byte {
return nil
}
-func MapExec[T, S any](nWorkers int, slice iter.Seq[T], fn func(T) (S, error))
iter.Seq2[S, error] {
+func MapExec[T, S any](ctx context.Context, nWorkers int, slice iter.Seq[T],
fn func(T) (S, error)) iter.Seq2[S, error] {
if nWorkers <= 0 {
nWorkers = runtime.GOMAXPROCS(0)
}
- var g errgroup.Group
+ g, ctx := errgroup.WithContext(ctx)
ch := make(chan T, nWorkers)
out := make(chan S, nWorkers)
@@ -536,7 +537,11 @@ func MapExec[T, S any](nWorkers int, slice iter.Seq[T], fn
func(T) (S, error)) i
if err != nil {
return err
}
- out <- result
+ select {
+ case out <- result:
+ case <-ctx.Done():
+ return context.Cause(ctx)
+ }
}
return nil
@@ -545,12 +550,19 @@ func MapExec[T, S any](nWorkers int, slice iter.Seq[T],
fn func(T) (S, error)) i
var err error
go func() {
- defer close(out)
+ defer func() {
+ close(ch)
+ err = g.Wait()
+ close(out)
+ }()
+
for v := range slice {
- ch <- v
+ select {
+ case ch <- v:
+ case <-ctx.Done():
+ return
+ }
}
- close(ch)
- err = g.Wait()
}()
return func(yield func(S, error) bool) {
diff --git a/table/internal/utils_test.go b/table/internal/utils_test.go
index 24664441..383eb103 100644
--- a/table/internal/utils_test.go
+++ b/table/internal/utils_test.go
@@ -18,6 +18,8 @@
package internal_test
import (
+ "context"
+ "errors"
"slices"
"testing"
"time"
@@ -72,6 +74,26 @@ func TestTruncateUpperBoundBinary(t *testing.T) {
assert.Nil(t, internal.TruncateUpperBoundBinary([]byte{0xff, 0xff,
0x00}, 2))
}
+func TestMapExecAllWorkersError(t *testing.T) {
+ errFail := errors.New("worker failed")
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ for _, err := range internal.MapExec(context.Background(), 2,
slices.Values([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), func(i int) (int, error) {
+ return 0, errFail
+ }) {
+ _ = err
+ }
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(5 * time.Second):
+ t.Fatal("MapExec deadlocked when all workers returned errors")
+ }
+}
+
func TestMapExecFinish(t *testing.T) {
var (
ch = make(chan struct{}, 1)
@@ -82,7 +104,7 @@ func TestMapExecFinish(t *testing.T) {
go func() {
defer close(ch)
- for _, err := range internal.MapExec(3, slices.Values([]int{1,
2, 3, 4, 5, 6, 7, 8, 9, 10}), f) {
+ for _, err := range internal.MapExec(context.Background(), 3,
slices.Values([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), f) {
assert.NoError(t, err)
}
}()
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 00d71f68..8de01dfa 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -18,6 +18,7 @@
package table
import (
+ "context"
"errors"
"fmt"
"io"
@@ -239,7 +240,7 @@ func (of *overwriteFiles) deletedEntries()
([]iceberg.ManifestEntry, error) {
nWorkers := config.EnvConfig.MaxWorkers
finalResult := make([]iceberg.ManifestEntry, 0, len(previousManifests))
- for entries, err := range tblutils.MapExec(nWorkers,
slices.Values(previousManifests), getEntries) {
+ for entries, err := range tblutils.MapExec(context.TODO(), nWorkers,
slices.Values(previousManifests), getEntries) {
if err != nil {
return nil, err
}
diff --git a/table/writer.go b/table/writer.go
index 92b437fa..b6610f8d 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -210,7 +210,7 @@ func (w *concurrentDataFileWriter) writeFiles(ctx
context.Context, rootLocation
}
}
- return internal.MapExec(config.EnvConfig.MaxWorkers, tasks, func(t
WriteTask) (iceberg.DataFile, error) {
+ return internal.MapExec(ctx, config.EnvConfig.MaxWorkers, tasks, func(t
WriteTask) (iceberg.DataFile, error) {
return fw.writeFile(ctx, partitionValues, t)
})
}