darren2013 commented on issue #506:
URL: https://github.com/apache/arrow-go/issues/506#issuecomment-3300798571
> So a couple things:
>
> Looking at the memory you posted
>
> > 3141.12MB 20.20% 20.20% 3141.12MB 20.20%
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewColumnMetaData
(inline)
> > 2053.06MB 13.20% 33.40% 2053.06MB 13.20%
github.com/apache/arrow-go/v18/parquet/metadata.(*ColumnChunkMetaDataBuilder).Finish
> > 2022.75MB 13.01% 46.41% 2022.75MB 13.01%
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewStatistics
(inline)
> > 1683.18MB 10.82% 57.23% 6348.41MB 40.82%
github.com/apache/arrow-go/v18/parquet/file.(*columnWriter).Close
> > 1630.68MB 10.49% 67.71% 5779.84MB 37.16%
github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupMetaDataBuilder).NextColumnChunk
> > 1389.17MB 8.93% 76.65% 14417.33MB 92.70%
github.com/apache/arrow-go/v18/parquet/file.(*rowGroupWriter).NextColumn
>
> Nearly all of this memory is coming from the Column metadata, with 909
columns that makes sense. In addition, when you call `Write(record)` it is
going to create a new row group every time unless you use `WriteBuffered`, and
every time it creates a row group it's going to build and construct a bunch of
metadata for all 909 columns. Because Parquet metadata goes at the end of the
file it's going to keep it in memory until you finish and write it out by
closing the file. The RowGroupWriter has `TotalCompressedBytes` and
`TotalBytesWritten` methods which you could use to track when to break up row
groups potentially.
>
> > I tried every 500000 records,close the filewirter,and recreate a new
filewrite,the memory will release significantly.In this way,will create many
small files,and the important,I am not sure when the memory is full,may be
300000 or 1000000.Ideally, The memory is released,when the batch writing
finished
>
> If every record is ~1.7KB, then 500000 is ~830MB before compression and
encoding. It also might make sense to disable dictionary encoding depending on
the cardinality of your columns (use `WithDictionaryDefault(false)` when
building the properties). Depending on your workflows, that might be a
perfectly reasonable size for your parquet files, not a "small" file. The
memory for the records will be released after the batches are written and
flushed to the file (once the GC reclaims it), but you're still going to build
up a large amount of metadata when you're writing that many row groups and
columns.
>
> Can I see the `forceRowGroupFlush` function? Also what batch size are you
using?
Thank you for your replying!
forceRowGroupFlush function as follows.The batch_size is configured 1500
`
func (p *ParquetProcessor) forceRowGroupFlush() error {
if p.arrowWriter == nil {
return fmt.Errorf("arrow writer is nil")
}
log.Printf("ParquetProcessor[%d] 开始深度内存释放,当前行组记录: %d", p.instanceID,
p.currentRowGroupSize)
// 🔥 方案1: 尝试通过Close再Reopen来强制释放C++内存
// 这是最有效的方法,但需要考虑文件完整性
// 保存当前文件路径
currentFilePath := p.filePath
if err := p.arrowWriter.Close(); err != nil {
log.Printf("ParquetProcessor[%d] Warning: error closing arrow
writer: %v", p.instanceID, err)
}
p.arrowWriter = nil
// 关闭文件句柄
if p.currentFile != nil {
if err := p.currentFile.Close(); err != nil {
log.Printf("ParquetProcessor[%d] Warning: error closing
file: %v", p.instanceID, err)
}
p.currentFile = nil
}
// 释放RecordBuilder
if p.recordBuilder != nil {
p.recordBuilder.Release()
p.recordBuilder = nil
}
// 🔥 方案2: 创建新的分段文件来避免内存累积
// 注意:Parquet格式不支持追加写入,所以我们创建新的分段文件
p.fileSequence++
timestamp := time.Now().Format("150405")
newFileName := fmt.Sprintf("logs_%s_inst%d_seg%d.parquet", timestamp,
p.instanceID, p.fileSequence)
newFilePath := filepath.Join(filepath.Dir(currentFilePath), newFileName)
file, err := os.Create(newFilePath)
if err != nil {
return fmt.Errorf("failed to create new segment file: %w", err)
}
p.currentFile = file
p.filePath = newFilePath
p.totalRecords = 0 // 新文件重置记录数
log.Printf("ParquetProcessor[%d] 创建新分段文件: %s -> %s", p.instanceID,
currentFilePath, newFilePath)
// 重新创建Arrow Writer
arrowWriterProps := pqarrow.DefaultWriterProps()
arrowWriter, err := pqarrow.NewFileWriter(p.arrowSchema, p.currentFile,
p.writerProps, arrowWriterProps)
if err != nil {
// 如果创建失败,关闭文件并报错
p.currentFile.Close()
return fmt.Errorf("failed to recreate arrow writer: %w", err)
}
p.arrowWriter = arrowWriter
// 重新创建RecordBuilder
p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema)
log.Printf("ParquetProcessor[%d] 深度内存释放完成,Writer已重新创建", p.instanceID)
return nil
}
`
I changed the method calling
` - if err := p.arrowWriter.Write(recordBatch); err != nil {
+ if err := p.arrowWriter.WriteBuffered(recordBatch); err != nil {
return fmt.Errorf("failed to write record batch to parquet:
%w", err)
}
`
and the max row group size setting
`
writerProps := parquet.NewWriterProperties(
parquet.WithCompression(getCompressionType(p.config.Compression)),
parquet.WithVersion(parquet.V2_LATEST),
// 设置较小的行组大小以减少内存使用
parquet.WithMaxRowGroupLength(1000),
)
`
The memory is still increasing,the pprof as follows. No matter Write(record)
or WriteBuffered method,The column meta data will keep in memory unti the
filewrite is closed,is it right?if this ,how to manage the memory correctly? If
I close the file frequently,will result in many small files.
`
darren@darrendus-Mac-mini ~ % go tool pprof -text
http://10.20.183.250:6060/debug/pprof/heap | head -30
Fetching profile over HTTP from http://10.20.183.250:6060/debug/pprof/heap
Saved profile in
/Users/darren/pprof/pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.022.pb.gz
File: log_fusion
Build ID: bee848f2187c216cf4161642c602db04073fd81a
Type: inuse_space
Time: 2025-09-17 08:34:36 CST
Showing nodes accounting for 2397.48MB, 96.82% of 2476.11MB total
Dropped 193 nodes (cum <= 12.38MB)
flat flat% sum% cum cum%
510.60MB 20.62% 20.62% 510.60MB 20.62%
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewColumnMetaData
(inline)
337.51MB 13.63% 34.25% 337.51MB 13.63%
github.com/apache/arrow-go/v18/parquet/metadata.(*ColumnChunkMetaDataBuilder).Finish
329.54MB 13.31% 47.56% 329.54MB 13.31%
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewStatistics
(inline)
291.03MB 11.75% 59.31% 985.97MB 39.82%
github.com/apache/arrow-go/v18/parquet/file.(*columnWriter).Close
251.58MB 10.16% 69.47% 1190.89MB 48.10%
github.com/apache/arrow-go/v18/parquet/file.(*rowGroupWriter).initColumns
250.19MB 10.10% 79.58% 921.29MB 37.21%
github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupMetaDataBuilder).NextColumnChunk
116.51MB 4.71% 84.28% 160.51MB 6.48%
github.com/apache/arrow-go/v18/parquet/metadata.NewColumnChunkMetaDataBuilderWithContents
(inline)
63.71MB 2.57% 86.86% 63.71MB 2.57% reflect.mapassign_faststr0
57.62MB 2.33% 89.18% 57.62MB 2.33% bufio.(*Scanner).Text (inline)
44.50MB 1.80% 90.98% 44.50MB 1.80%
github.com/apache/arrow-go/v18/parquet/schema.ColumnPathFromNode
44.45MB 1.80% 92.78% 44.45MB 1.80%
github.com/apache/arrow-go/v18/arrow/memory.(*GoAllocator).Allocate
(partial-inline)
36.51MB 1.47% 94.25% 36.51MB 1.47%
encoding/json.(*decodeState).literalStore
23.18MB 0.94% 95.19% 23.18MB 0.94%
github.com/apache/arrow-go/v18/parquet/metadata.NewRowGroupMetaDataBuilder
(inline)
21.56MB 0.87% 96.06% 30.94MB 1.25%
github.com/apache/arrow-go/v18/parquet/pqarrow.writeDenseArrow
12.50MB 0.5% 96.56% 114.71MB 4.63%
encoding/json.(*decodeState).object
3.50MB 0.14% 96.70% 44.94MB 1.82%
github.com/apache/arrow-go/v18/parquet/pqarrow.(*arrowColumnWriter).Write
2.50MB 0.1% 96.80% 116.72MB 4.71%
github.com/darren2013/log_fusion_source/internal/source.FileConsumer
0.50MB 0.02% 96.82% 13.52MB 0.55%
github.com/apache/arrow-go/v18/parquet/file.NewByteArrayColumnChunkWriter
0 0% 96.82% 114.71MB 4.63%
encoding/json.(*decodeState).unmarshal
0 0% 96.82% 114.71MB 4.63%
encoding/json.(*decodeState).value
0 0% 96.82% 114.71MB 4.63% encoding/json.Unmarshal
0 0% 96.82% 19.01MB 0.77%
github.com/apache/arrow-go/v18/arrow/array.(*BinaryBuilder).AppendNull
0 0% 96.82% 19.51MB 0.79%
github.com/apache/arrow-go/v18/arrow/array.(*BinaryBuilder).Reserve
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]