twuebi commented on code in PR #759:
URL: https://github.com/apache/iceberg-go/pull/759#discussion_r2878507893


##########
table/internal/parquet_files.go:
##########
@@ -243,46 +244,105 @@ func (parquetFormat) GetWriteProperties(props 
iceberg.Properties) any {
                parquet.WithCompressionLevel(compressionLevel))
 }
 
-func (p parquetFormat) WriteDataFile(ctx context.Context, fs 
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches 
[]arrow.RecordBatch) (_ iceberg.DataFile, err error) {
+func (p parquetFormat) WriteDataFile(ctx context.Context, fs 
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches 
[]arrow.RecordBatch) (iceberg.DataFile, error) {
+       w, err := p.NewFileWriter(ctx, fs, partitionValues, info, 
batches[0].Schema())
+       if err != nil {
+               return nil, err
+       }
+
+       for _, batch := range batches {
+               if err := w.Write(batch); err != nil {
+                       w.Close()
+
+                       return nil, err
+               }
+       }
+
+       return w.Close()
+}
+
+// ParquetFileWriter is an incremental single-file writer with open/write/close
+// lifecycle. It writes Arrow record batches to a Parquet file and tracks bytes
+// written for rolling file decisions.
+type ParquetFileWriter struct {
+       pqWriter   *pqarrow.FileWriter
+       counter    *internal.CountingWriter
+       fileCloser io.Closer
+       format     parquetFormat
+       info       WriteFileInfo
+       partition  map[int]any
+       colMapping map[string]int
+}
+
+// NewFileWriter creates a ParquetFileWriter that writes batches to a single
+// Parquet file. Call Write to append batches, BytesWritten to check actual
+// compressed file size, and Close to finalize and get the resulting DataFile.
+func (p parquetFormat) NewFileWriter(ctx context.Context, fs iceio.WriteFileIO,
+       partitionValues map[int]any, info WriteFileInfo, arrowSchema 
*arrow.Schema,
+) (FileWriter, error) {
        fw, err := fs.Create(info.FileName)
        if err != nil {
                return nil, err
        }
 
-       defer internal.CheckedClose(fw, &err)
+       colMapping, err := p.PathToIDMapping(info.FileSchema)
+       if err != nil {
+               fw.Close()
 
-       cntWriter := internal.CountingWriter{W: fw}
+               return nil, err
+       }
+
+       counter := &internal.CountingWriter{W: fw}
        mem := compute.GetAllocator(ctx)
        writerProps := 
parquet.NewWriterProperties(info.WriteProps.([]parquet.WriterProperty)...)
        arrProps := 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem), 
pqarrow.WithStoreSchema())
 
-       writer, err := pqarrow.NewFileWriter(batches[0].Schema(), &cntWriter, 
writerProps, arrProps)
+       writer, err := pqarrow.NewFileWriter(arrowSchema, counter, writerProps, 
arrProps)
        if err != nil {
+               fw.Close()
+
                return nil, err
        }
 
-       for _, batch := range batches {
-               if err := writer.WriteBuffered(batch); err != nil {
-                       return nil, err
-               }
-       }
+       return &ParquetFileWriter{
+               pqWriter:   writer,
+               counter:    counter,
+               fileCloser: fw,
+               format:     p,
+               info:       info,
+               partition:  partitionValues,
+               colMapping: colMapping,
+       }, nil
+}
 
-       if err := writer.Close(); err != nil {
-               return nil, err
-       }
+// Write appends a record batch to the Parquet file.
+func (w *ParquetFileWriter) Write(batch arrow.RecordBatch) error {
+       return w.pqWriter.WriteBuffered(batch)
+}
 
-       filemeta, err := writer.FileMetadata()
-       if err != nil {
+// BytesWritten returns flushed bytes plus compressed bytes buffered in the
+// current row group — matching the size estimate used by iceberg-java and
+// iceberg-rust to make rolling decisions.
+func (w *ParquetFileWriter) BytesWritten() int64 {
+       return w.counter.Count + w.pqWriter.RowGroupTotalCompressedBytes()

Review Comment:
   ```go
   // RowGroupTotalCompressedBytes returns the total number of bytes after 
compression
   // that have been written to the current row group so far.
   func (fw *FileWriter) RowGroupTotalCompressedBytes() int64 {
        if fw.rgw != nil {
                return fw.rgw.TotalCompressedBytes()
        }
        return 0
   }
   
   // RowGroupTotalBytesWritten returns the total number of bytes written and 
flushed out in
   // the current row group.
   func (fw *FileWriter) RowGroupTotalBytesWritten() int64 {
        if fw.rgw != nil {
                return fw.rgw.TotalBytesWritten()
        }
        return 0
   }
   ```
   
   Since we're just using `RowGroupTotalCompressedBytes` here, I don't think 
we're double counting, `w.counter` wraps the `FileWriter` so it should only 
count what has been flushed, `RowGroupTotalCompressedBytes` only counts what 
compressed into the current row group, in contrast to 
`RowGroupTotalBytesWritten` which would also have the flushed bytes.
   
   
   Also added a test `TestBytesWrittenNoDoubleCountAcrossRowGroups`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to