This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new c66aacd4 fix data race in yieldDataFiles (#650)
c66aacd4 is described below
commit c66aacd4d2e0ad4671c7671d0aac8f465ec7d7e3
Author: Vincent Janelle <[email protected]>
AuthorDate: Mon Dec 22 08:01:34 2025 -0800
fix data race in yieldDataFiles (#650)
The err variable was being written by a goroutine (lines 157-158)
while simultaneously being read by the iterator (lines 168, 173).
Use a channel to defer reading the error until after the goroutine
completes and the output channel is closed.
---
table/partitioned_fanout_writer.go | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git a/table/partitioned_fanout_writer.go
b/table/partitioned_fanout_writer.go
index d0c366af..838513f5 100644
--- a/table/partitioned_fanout_writer.go
+++ b/table/partitioned_fanout_writer.go
@@ -151,11 +151,15 @@ func (p *partitionedFanoutWriter) fanout(ctx
context.Context, inputRecordsCh <-c
}
func (p *partitionedFanoutWriter) yieldDataFiles(fanoutWorkers
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile)
iter.Seq2[iceberg.DataFile, error] {
- var err error
+ // Use a channel to safely communicate the error from the goroutine
+ // to avoid a data race between writing err in the goroutine and
reading it in the iterator.
+ errCh := make(chan error, 1)
go func() {
defer close(outputDataFilesCh)
- err = fanoutWorkers.Wait()
+ err := fanoutWorkers.Wait()
err = errors.Join(err, p.writers.closeAll())
+ errCh <- err
+ close(errCh)
}()
return func(yield func(iceberg.DataFile, error) bool) {
@@ -164,13 +168,15 @@ func (p *partitionedFanoutWriter)
yieldDataFiles(fanoutWorkers *errgroup.Group,
}
}()
+ // Yield data files as they arrive - no error yet since
goroutine is still running
for f := range outputDataFilesCh {
- if !yield(f, err) {
+ if !yield(f, nil) {
return
}
}
- if err != nil {
+ // Channel is closed, now safe to read the error
+ if err := <-errCh; err != nil {
yield(nil, err)
}
}