This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 131cc926 fix(table): deadlock in MapExec when workers error (#810)
131cc926 is described below

commit 131cc9269e549a2a53ed1561c6cbff42fe34138d
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Fri Mar 27 21:14:15 2026 +0100

    fix(table): deadlock in MapExec when workers error (#810)
    
    Use errgroup.WithContext so the first worker error cancels a derived
    context. Add select on ctx.Done() for both the feeder (ch <- v) and
    worker (out <- result) sends, allowing them to unblock when the context
    is cancelled instead of blocking forever.
    
    Closes #796
---
 table/internal/utils.go      | 26 +++++++++++++++++++-------
 table/internal/utils_test.go | 24 +++++++++++++++++++++++-
 table/snapshot_producers.go  |  3 ++-
 table/writer.go              |  2 +-
 4 files changed, 45 insertions(+), 10 deletions(-)

diff --git a/table/internal/utils.go b/table/internal/utils.go
index 95a1336d..adfbe2ed 100644
--- a/table/internal/utils.go
+++ b/table/internal/utils.go
@@ -20,6 +20,7 @@ package internal
 import (
        "bytes"
        "container/heap"
+       "context"
        "encoding/binary"
        "errors"
        "fmt"
@@ -520,12 +521,12 @@ func TruncateUpperBoundBinary(val []byte, trunc int) 
[]byte {
        return nil
 }
 
-func MapExec[T, S any](nWorkers int, slice iter.Seq[T], fn func(T) (S, error)) 
iter.Seq2[S, error] {
+func MapExec[T, S any](ctx context.Context, nWorkers int, slice iter.Seq[T], 
fn func(T) (S, error)) iter.Seq2[S, error] {
        if nWorkers <= 0 {
                nWorkers = runtime.GOMAXPROCS(0)
        }
 
-       var g errgroup.Group
+       g, ctx := errgroup.WithContext(ctx)
        ch := make(chan T, nWorkers)
        out := make(chan S, nWorkers)
 
@@ -536,7 +537,11 @@ func MapExec[T, S any](nWorkers int, slice iter.Seq[T], fn 
func(T) (S, error)) i
                                if err != nil {
                                        return err
                                }
-                               out <- result
+                               select {
+                               case out <- result:
+                               case <-ctx.Done():
+                                       return context.Cause(ctx)
+                               }
                        }
 
                        return nil
@@ -545,12 +550,19 @@ func MapExec[T, S any](nWorkers int, slice iter.Seq[T], 
fn func(T) (S, error)) i
 
        var err error
        go func() {
-               defer close(out)
+               defer func() {
+                       close(ch)
+                       err = g.Wait()
+                       close(out)
+               }()
+
                for v := range slice {
-                       ch <- v
+                       select {
+                       case ch <- v:
+                       case <-ctx.Done():
+                               return
+                       }
                }
-               close(ch)
-               err = g.Wait()
        }()
 
        return func(yield func(S, error) bool) {
diff --git a/table/internal/utils_test.go b/table/internal/utils_test.go
index 24664441..383eb103 100644
--- a/table/internal/utils_test.go
+++ b/table/internal/utils_test.go
@@ -18,6 +18,8 @@
 package internal_test
 
 import (
+       "context"
+       "errors"
        "slices"
        "testing"
        "time"
@@ -72,6 +74,26 @@ func TestTruncateUpperBoundBinary(t *testing.T) {
        assert.Nil(t, internal.TruncateUpperBoundBinary([]byte{0xff, 0xff, 
0x00}, 2))
 }
 
+func TestMapExecAllWorkersError(t *testing.T) {
+       errFail := errors.New("worker failed")
+
+       done := make(chan struct{})
+       go func() {
+               defer close(done)
+               for _, err := range internal.MapExec(context.Background(), 2, 
slices.Values([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), func(i int) (int, error) {
+                       return 0, errFail
+               }) {
+                       _ = err
+               }
+       }()
+
+       select {
+       case <-done:
+       case <-time.After(5 * time.Second):
+               t.Fatal("MapExec deadlocked when all workers returned errors")
+       }
+}
+
 func TestMapExecFinish(t *testing.T) {
        var (
                ch = make(chan struct{}, 1)
@@ -82,7 +104,7 @@ func TestMapExecFinish(t *testing.T) {
 
        go func() {
                defer close(ch)
-               for _, err := range internal.MapExec(3, slices.Values([]int{1, 
2, 3, 4, 5, 6, 7, 8, 9, 10}), f) {
+               for _, err := range internal.MapExec(context.Background(), 3, 
slices.Values([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), f) {
                        assert.NoError(t, err)
                }
        }()
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 00d71f68..8de01dfa 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -18,6 +18,7 @@
 package table
 
 import (
+       "context"
        "errors"
        "fmt"
        "io"
@@ -239,7 +240,7 @@ func (of *overwriteFiles) deletedEntries() 
([]iceberg.ManifestEntry, error) {
 
        nWorkers := config.EnvConfig.MaxWorkers
        finalResult := make([]iceberg.ManifestEntry, 0, len(previousManifests))
-       for entries, err := range tblutils.MapExec(nWorkers, 
slices.Values(previousManifests), getEntries) {
+       for entries, err := range tblutils.MapExec(context.TODO(), nWorkers, 
slices.Values(previousManifests), getEntries) {
                if err != nil {
                        return nil, err
                }
diff --git a/table/writer.go b/table/writer.go
index 92b437fa..b6610f8d 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -210,7 +210,7 @@ func (w *concurrentDataFileWriter) writeFiles(ctx 
context.Context, rootLocation
                }
        }
 
-       return internal.MapExec(config.EnvConfig.MaxWorkers, tasks, func(t 
WriteTask) (iceberg.DataFile, error) {
+       return internal.MapExec(ctx, config.EnvConfig.MaxWorkers, tasks, func(t 
WriteTask) (iceberg.DataFile, error) {
                return fw.writeFile(ctx, partitionValues, t)
        })
 }

Reply via email to