Copilot commented on code in PR #702:
URL: https://github.com/apache/iceberg-go/pull/702#discussion_r2739970096
##########
table/writer.go:
##########
@@ -33,15 +33,18 @@ import (
type WriteTask struct {
Uuid uuid.UUID
ID int
+ PartitionID int // Added: partition ID for file naming
+ FileCount int // Added: file counter within this task
Review Comment:
The "Added:" prefix in comments is typically used temporarily during code
reviews but should be removed before merging to production. Standard Go
documentation comments should describe what the field does, not that it was
recently added. Consider updating these comments to describe the purpose of the
field instead.
```suggestion
PartitionID int // PartitionID is the partition identifier used in data
file naming.
FileCount int // FileCount is a sequential counter for files written
by this task.
```
##########
table/rolling_data_writer.go:
##########
@@ -156,13 +166,16 @@ func (r *RollingDataWriter) flushToDataFile(batch
[]arrow.RecordBatch, outputDat
}
task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
- cnt := int(r.factory.counter.Add(1) - 1)
+ cnt, _ := r.factory.nextCount()
Review Comment:
This line calls `nextCount()` which is created by `iter.Pull()` in the
writerFactory. According to Go's documentation, the functions returned by
`iter.Pull` are not safe for concurrent use. Since multiple RollingDataWriter
instances can call `flushToDataFile` concurrently (each in its own goroutine
from the `stream` method), this creates a concurrency safety violation.
The writerFactory needs to protect access to `nextCount()` with a mutex to
ensure thread-safe counter increments across multiple partition writers.
##########
internal/utils.go:
##########
@@ -164,12 +165,15 @@ func RecoverError(err *error) {
}
func Counter(start int) iter.Seq[int] {
+ var current atomic.Int64
+ current.Store(int64(start))
+
return func(yield func(int) bool) {
for {
- if !yield(start) {
+ if !yield(int(current.Load())) {
return
}
- start++
+ current.Add(1)
Review Comment:
There is a race condition in this implementation. The current code performs
a Load followed by an Add in separate operations, which means two concurrent
goroutines could both load the same value before either increments it,
resulting in duplicate counter values.
The correct implementation should atomically fetch-and-increment the value
before yielding it. This ensures each caller gets a unique, sequential counter
value even in concurrent scenarios.
##########
table/rolling_data_writer.go:
##########
@@ -60,6 +66,8 @@ func NewWriterFactory(rootLocation string, args
recordWritingArgs, meta *Metadat
// file strategy to manage file sizes.
type RollingDataWriter struct {
partitionKey string
+ partitionID int // Added: unique ID for this partition
+ fileCount atomic.Int64 // Added: counter for files in this
partition
Review Comment:
The "Added:" prefix in comments is typically used temporarily during code
reviews but should be removed before merging to production. Standard Go
documentation comments should describe what the field does, not that it was
recently added. Consider updating these comments to describe the purpose of
each field instead.
##########
table/rolling_data_writer.go:
##########
@@ -33,25 +33,31 @@ import (
// for different partitions, providing shared configuration and coordination
// across all writers in a partitioned write operation.
type writerFactory struct {
- rootLocation string
- args recordWritingArgs
- meta *MetadataBuilder
- taskSchema *iceberg.Schema
- targetFileSize int64
- writers sync.Map
- counter atomic.Int64
- mu sync.Mutex
+ rootLocation string
+ args recordWritingArgs
+ meta *MetadataBuilder
+ taskSchema *iceberg.Schema
+ targetFileSize int64
+ writers sync.Map
+ nextCount func() (int, bool)
+ stopCount func()
+ partitionIDCounter atomic.Int64 // Added: counter for partition IDs
Review Comment:
The "Added:" prefix in comments is typically used temporarily during code
reviews but should be removed before merging to production. Standard Go
documentation comments should describe what the field does, not that it was
recently added. Consider updating this comment to describe the purpose of the
field instead.
```suggestion
partitionIDCounter atomic.Int64 // partitionIDCounter generates unique
IDs for partitions
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]