darren2013 commented on issue #506:
URL: https://github.com/apache/arrow-go/issues/506#issuecomment-3300798571

   > So a couple things:
   > 
   > Looking at the memory you posted
   > 
   > > 3141.12MB 20.20% 20.20% 3141.12MB 20.20% 
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewColumnMetaData
 (inline)
   > > 2053.06MB 13.20% 33.40% 2053.06MB 13.20% 
github.com/apache/arrow-go/v18/parquet/metadata.(*ColumnChunkMetaDataBuilder).Finish
   > > 2022.75MB 13.01% 46.41% 2022.75MB 13.01% 
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewStatistics 
(inline)
   > > 1683.18MB 10.82% 57.23% 6348.41MB 40.82% 
github.com/apache/arrow-go/v18/parquet/file.(*columnWriter).Close
   > > 1630.68MB 10.49% 67.71% 5779.84MB 37.16% 
github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupMetaDataBuilder).NextColumnChunk
   > > 1389.17MB 8.93% 76.65% 14417.33MB 92.70% 
github.com/apache/arrow-go/v18/parquet/file.(*rowGroupWriter).NextColumn
   > 
   > Nearly all of this memory is coming from the Column metadata, with 909 
columns that makes sense. In addition, when you call `Write(record)` it is 
going to create a new row group every time unless you use `WriteBuffered`, and 
every time it creates a row group it's going to build and construct a bunch of 
metadata for all 909 columns. Because Parquet metadata goes at the end of the 
file it's going to keep it in memory until you finish and write it out by 
closing the file. The RowGroupWriter has `TotalCompressedBytes` and 
`TotalBytesWritten` methods which you could use to track when to break up row 
groups potentially.
   > 
   > > I tried every 500000 records,close the filewirter,and recreate a new 
filewrite,the memory will release significantly.In this way,will create many 
small files,and the important,I am not sure when the memory is full,may be 
300000 or 1000000.Ideally, The memory is released,when the batch writing 
finished
   > 
   > If every record is ~1.7KB, then 500000 is ~830MB before compression and 
encoding. It also might make sense to disable dictionary encoding depending on 
the cardinality of your columns (use `WithDictionaryDefault(false)` when 
building the properties). Depending on your workflows, that might be a 
perfectly reasonable size for your parquet files, not a "small" file. The 
memory for the records will be released after the batches are written and 
flushed to the file (once the GC reclaims it), but you're still going to build 
up a large amount of metadata when you're writing that many row groups and 
columns.
   > 
   > Can I see the `forceRowGroupFlush` function? Also what batch size are you 
using?
   Thank you for your replying!
   
   forceRowGroupFlush function as follows.The batch_size is configured 1500
   `
   func (p *ParquetProcessor) forceRowGroupFlush() error {
        if p.arrowWriter == nil {
                return fmt.Errorf("arrow writer is nil")
        }
   
        log.Printf("ParquetProcessor[%d] 开始深度内存释放,当前行组记录: %d", p.instanceID, 
p.currentRowGroupSize)
   
        // 🔥 方案1: 尝试通过Close再Reopen来强制释放C++内存
        // 这是最有效的方法,但需要考虑文件完整性
   
        // 保存当前文件路径
        currentFilePath := p.filePath
   
        if err := p.arrowWriter.Close(); err != nil {
                log.Printf("ParquetProcessor[%d] Warning: error closing arrow 
writer: %v", p.instanceID, err)
        }
        p.arrowWriter = nil
   
        // 关闭文件句柄
        if p.currentFile != nil {
                if err := p.currentFile.Close(); err != nil {
                        log.Printf("ParquetProcessor[%d] Warning: error closing 
file: %v", p.instanceID, err)
                }
                p.currentFile = nil
        }
   
        // 释放RecordBuilder
        if p.recordBuilder != nil {
                p.recordBuilder.Release()
                p.recordBuilder = nil
        }
   
        // 🔥 方案2: 创建新的分段文件来避免内存累积
        // 注意:Parquet格式不支持追加写入,所以我们创建新的分段文件
        p.fileSequence++
        timestamp := time.Now().Format("150405")
        newFileName := fmt.Sprintf("logs_%s_inst%d_seg%d.parquet", timestamp, 
p.instanceID, p.fileSequence)
        newFilePath := filepath.Join(filepath.Dir(currentFilePath), newFileName)
   
        file, err := os.Create(newFilePath)
        if err != nil {
                return fmt.Errorf("failed to create new segment file: %w", err)
        }
        p.currentFile = file
        p.filePath = newFilePath
        p.totalRecords = 0 // 新文件重置记录数
   
        log.Printf("ParquetProcessor[%d] 创建新分段文件: %s -> %s", p.instanceID, 
currentFilePath, newFilePath)
   
        // 重新创建Arrow Writer
        arrowWriterProps := pqarrow.DefaultWriterProps()
        arrowWriter, err := pqarrow.NewFileWriter(p.arrowSchema, p.currentFile, 
p.writerProps, arrowWriterProps)
        if err != nil {
                // 如果创建失败,关闭文件并报错
                p.currentFile.Close()
                return fmt.Errorf("failed to recreate arrow writer: %w", err)
        }
        p.arrowWriter = arrowWriter
   
        // 重新创建RecordBuilder
        p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema)
   
        log.Printf("ParquetProcessor[%d] 深度内存释放完成,Writer已重新创建", p.instanceID)
        return nil
   }
   `
   
   I changed the method calling
   `    - if err := p.arrowWriter.Write(recordBatch); err != nil {
        + if err := p.arrowWriter.WriteBuffered(recordBatch); err != nil {
                return fmt.Errorf("failed to write record batch to parquet: 
%w", err)
        }
   `
   and the max row group size setting
   `
   writerProps := parquet.NewWriterProperties(
                
parquet.WithCompression(getCompressionType(p.config.Compression)),
                parquet.WithVersion(parquet.V2_LATEST),
                // 设置较小的行组大小以减少内存使用
                parquet.WithMaxRowGroupLength(1000),
        )
   `
   The memory is still increasing,the pprof as follows. No matter Write(record) 
or WriteBuffered method,The column meta data will keep in memory unti the 
filewrite is closed,is it right?if this ,how to manage the memory correctly? If 
I close the file frequently,will result in many small files.
   `
   darren@darrendus-Mac-mini ~ % go tool pprof -text 
http://10.20.183.250:6060/debug/pprof/heap | head -30
   Fetching profile over HTTP from http://10.20.183.250:6060/debug/pprof/heap
   Saved profile in 
/Users/darren/pprof/pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.022.pb.gz
   File: log_fusion
   Build ID: bee848f2187c216cf4161642c602db04073fd81a
   Type: inuse_space
   Time: 2025-09-17 08:34:36 CST
   Showing nodes accounting for 2397.48MB, 96.82% of 2476.11MB total
   Dropped 193 nodes (cum <= 12.38MB)
         flat  flat%   sum%        cum   cum%
     510.60MB 20.62% 20.62%   510.60MB 20.62%  
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewColumnMetaData
 (inline)
     337.51MB 13.63% 34.25%   337.51MB 13.63%  
github.com/apache/arrow-go/v18/parquet/metadata.(*ColumnChunkMetaDataBuilder).Finish
     329.54MB 13.31% 47.56%   329.54MB 13.31%  
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewStatistics 
(inline)
     291.03MB 11.75% 59.31%   985.97MB 39.82%  
github.com/apache/arrow-go/v18/parquet/file.(*columnWriter).Close
     251.58MB 10.16% 69.47%  1190.89MB 48.10%  
github.com/apache/arrow-go/v18/parquet/file.(*rowGroupWriter).initColumns
     250.19MB 10.10% 79.58%   921.29MB 37.21%  
github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupMetaDataBuilder).NextColumnChunk
     116.51MB  4.71% 84.28%   160.51MB  6.48%  
github.com/apache/arrow-go/v18/parquet/metadata.NewColumnChunkMetaDataBuilderWithContents
 (inline)
      63.71MB  2.57% 86.86%    63.71MB  2.57%  reflect.mapassign_faststr0
      57.62MB  2.33% 89.18%    57.62MB  2.33%  bufio.(*Scanner).Text (inline)
      44.50MB  1.80% 90.98%    44.50MB  1.80%  
github.com/apache/arrow-go/v18/parquet/schema.ColumnPathFromNode
      44.45MB  1.80% 92.78%    44.45MB  1.80%  
github.com/apache/arrow-go/v18/arrow/memory.(*GoAllocator).Allocate 
(partial-inline)
      36.51MB  1.47% 94.25%    36.51MB  1.47%  
encoding/json.(*decodeState).literalStore
      23.18MB  0.94% 95.19%    23.18MB  0.94%  
github.com/apache/arrow-go/v18/parquet/metadata.NewRowGroupMetaDataBuilder 
(inline)
      21.56MB  0.87% 96.06%    30.94MB  1.25%  
github.com/apache/arrow-go/v18/parquet/pqarrow.writeDenseArrow
      12.50MB   0.5% 96.56%   114.71MB  4.63%  
encoding/json.(*decodeState).object
       3.50MB  0.14% 96.70%    44.94MB  1.82%  
github.com/apache/arrow-go/v18/parquet/pqarrow.(*arrowColumnWriter).Write
       2.50MB   0.1% 96.80%   116.72MB  4.71%  
github.com/darren2013/log_fusion_source/internal/source.FileConsumer
       0.50MB  0.02% 96.82%    13.52MB  0.55%  
github.com/apache/arrow-go/v18/parquet/file.NewByteArrayColumnChunkWriter
            0     0% 96.82%   114.71MB  4.63%  
encoding/json.(*decodeState).unmarshal
            0     0% 96.82%   114.71MB  4.63%  
encoding/json.(*decodeState).value
            0     0% 96.82%   114.71MB  4.63%  encoding/json.Unmarshal
            0     0% 96.82%    19.01MB  0.77%  
github.com/apache/arrow-go/v18/arrow/array.(*BinaryBuilder).AppendNull
            0     0% 96.82%    19.51MB  0.79%  
github.com/apache/arrow-go/v18/arrow/array.(*BinaryBuilder).Reserve
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to