zbs opened a new issue, #45638:
URL: https://github.com/apache/arrow/issues/45638
### Describe the usage question you have. Please include as many useful
details as possible.
I created a ParquetWriter class that initializes a schema/writer/outstream
and writes to a parquet file one batch at a time; however, it seems to be
OOMing and for the life of me I can not figure out why. I've experimented with
different chunk sizes, calling flush after each write, but to no avail, and
cannot find any documentation that indicates I'm doing anything seriously
wrong. Further, I used valgrind and it did not find any memory leaks.
Note this is using arrow 9.0.0, so it's considerably behind, but I'm hoping
the solution doesn't need to take that into account too much
```
class ParquetFileWriter {
public:
ParquetFileWriter(
const std::string & filename,
parquet::WriterProperties::Builder & writerPropertiesBuilder );
void initSchema( const std::shared_ptr< arrow::Schema > & schema )
override;
void writeBatch( const arrow::RecordBatch & batch ) override;
void poll() override;
private:
std::shared_ptr< arrow::io::OutputStream > outstream_;
std::unique_ptr< parquet::arrow::FileWriter > parquetWriter_;
std::shared_ptr< parquet::WriterProperties > writerProperties_;
};
ParquetFileWriter::ParquetFileWriter(
const std::string & filename,
parquet::WriterProperties::Builder & writerPropertiesBuilder )
: outstream_( arrow::io::FileOutputStream::Open( filename, false
).ValueOrDie() ) {
writerPropertiesBuilder.compression(parquet::Compression::ZSTD);
writerProperties_ = writerPropertiesBuilder.build();
}
void
ParquetFileWriter::initSchema( const std::shared_ptr< arrow::Schema > &
schema ) {
auto status = parquet::arrow::FileWriter::Open(*schema,
arrow::default_memory_pool(), outstream_, writerProperties_, &parquetWriter_);
VERIFY(status.ok(), "Failed to open Parquet file writer: %s",
status.ToString().c_str());
}
void
ParquetFileWriter::writeBatch( const arrow::RecordBatch & batch ) {
check( parquetWriter_ != nullptr, "Parquet writer is not initialized." );
std::vector<std::shared_ptr<arrow::RecordBatch>> batches = {
arrow::RecordBatch::Make(batch.schema(), batch.num_rows(), batch.columns()) };
auto table = arrow::Table::FromRecordBatches(batch.schema(), batches);
const arrow::Status status = parquetWriter_->WriteTable(**table,
batch.num_rows());
check( status.ok(), "Parquet batch write failed: %s",
status.ToString().c_str() );
check( flushStatus.ok(), "Flush status failed: %s",
status.ToString().c_str() );
}
```
In the above code, `writeBatch` might be called thousands of times.
### Component(s)
C++
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]