Copilot commented on code in PR #702:
URL: https://github.com/apache/iceberg-go/pull/702#discussion_r2739970096


##########
table/writer.go:
##########
@@ -33,15 +33,18 @@ import (
 type WriteTask struct {
        Uuid        uuid.UUID
        ID          int
+       PartitionID int // Added: partition ID for file naming
+       FileCount   int // Added: file counter within this task

Review Comment:
   The "Added:" prefix in comments is typically used temporarily during code 
reviews but should be removed before merging to production. Standard Go 
documentation comments should describe what the field does, not that it was 
recently added. Consider updating these comments to describe the purpose of the 
field instead.
   ```suggestion
        PartitionID int // PartitionID is the partition identifier used in data 
file naming.
        FileCount   int // FileCount is a sequential counter for files written 
by this task.
   ```



##########
table/rolling_data_writer.go:
##########
@@ -156,13 +166,16 @@ func (r *RollingDataWriter) flushToDataFile(batch 
[]arrow.RecordBatch, outputDat
        }
 
        task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
-               cnt := int(r.factory.counter.Add(1) - 1)
+               cnt, _ := r.factory.nextCount()

Review Comment:
   This line calls `nextCount()` which is created by `iter.Pull()` in the 
writerFactory. According to Go's documentation, the functions returned by 
`iter.Pull` are not safe for concurrent use. Since multiple RollingDataWriter 
instances can call `flushToDataFile` concurrently (each in its own goroutine 
from the `stream` method), this creates a concurrency safety violation.
   
   The writerFactory needs to protect access to `nextCount()` with a mutex to 
ensure thread-safe counter increments across multiple partition writers.



##########
internal/utils.go:
##########
@@ -164,12 +165,15 @@ func RecoverError(err *error) {
 }
 
 func Counter(start int) iter.Seq[int] {
+       var current atomic.Int64
+       current.Store(int64(start))
+
        return func(yield func(int) bool) {
                for {
-                       if !yield(start) {
+                       if !yield(int(current.Load())) {
                                return
                        }
-                       start++
+                       current.Add(1)

Review Comment:
   There is a race condition in this implementation. The current code performs 
a Load followed by an Add in separate operations, which means two concurrent 
goroutines could both load the same value before either increments it, 
resulting in duplicate counter values.
   
   The correct implementation should atomically fetch-and-increment the value 
before yielding it. This ensures each caller gets a unique, sequential counter 
value even in concurrent scenarios.



##########
table/rolling_data_writer.go:
##########
@@ -60,6 +66,8 @@ func NewWriterFactory(rootLocation string, args 
recordWritingArgs, meta *Metadat
 // file strategy to manage file sizes.
 type RollingDataWriter struct {
        partitionKey    string
+       partitionID     int          // Added: unique ID for this partition
+       fileCount       atomic.Int64 // Added: counter for files in this 
partition

Review Comment:
   The "Added:" prefix in comments is typically used temporarily during code 
reviews but should be removed before merging to production. Standard Go 
documentation comments should describe what the field does, not that it was 
recently added. Consider updating these comments to describe the purpose of 
each field instead.



##########
table/rolling_data_writer.go:
##########
@@ -33,25 +33,31 @@ import (
 // for different partitions, providing shared configuration and coordination
 // across all writers in a partitioned write operation.
 type writerFactory struct {
-       rootLocation   string
-       args           recordWritingArgs
-       meta           *MetadataBuilder
-       taskSchema     *iceberg.Schema
-       targetFileSize int64
-       writers        sync.Map
-       counter        atomic.Int64
-       mu             sync.Mutex
+       rootLocation       string
+       args               recordWritingArgs
+       meta               *MetadataBuilder
+       taskSchema         *iceberg.Schema
+       targetFileSize     int64
+       writers            sync.Map
+       nextCount          func() (int, bool)
+       stopCount          func()
+       partitionIDCounter atomic.Int64 // Added: counter for partition IDs

Review Comment:
   The "Added:" prefix in comments is typically used temporarily during code 
reviews but should be removed before merging to production. Standard Go 
documentation comments should describe what the field does, not that it was 
recently added. Consider updating this comment to describe the purpose of the 
field instead.
   ```suggestion
        partitionIDCounter atomic.Int64 // partitionIDCounter generates unique 
IDs for partitions
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to