wgtmac commented on code in PR #48468:
URL: https://github.com/apache/arrow/pull/48468#discussion_r2697790008
##########
cpp/src/parquet/file_writer.cc:
##########
@@ -68,6 +68,12 @@ int64_t RowGroupWriter::total_compressed_bytes_written()
const {
return contents_->total_compressed_bytes_written();
}
+int64_t RowGroupWriter::EstimatedTotalCompressedBytes() const {
+ return contents_->total_compressed_bytes() +
+ contents_->total_compressed_bytes_written() +
+ contents_->EstimatedBufferedValueBytes();
Review Comment:
In many common cases, the compression ratio is close to 3:1. So I used
something like `total_compressed_bytes + total_compressed_bytes_written +
EstimatedBufferedValueBytes / (codec_type != NONE ? 3 : 1)` as an empirical
value in the past.
`
##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -378,19 +378,19 @@ const double test_traits<::arrow::DoubleType>::value(4.2);
template <>
struct test_traits<::arrow::StringType> {
static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
- static std::string const value;
+ static const std::string value;
Review Comment:
These three lines are unrelated changes introduced by inconsistent local
clang-format version.
##########
cpp/src/parquet/file_writer.h:
##########
@@ -207,6 +213,10 @@ class PARQUET_EXPORT ParquetFileWriter {
void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
+ /// \brief Estimate compressed bytes per row from closed row groups.
+ /// \return Estimated bytes or std::nullopt when no written row group.
+ std::optional<double> EstimateCompressedBytesPerRow() const;
Review Comment:
Unrelated to this PR: perhaps it is also useful to provide an estimation of
the current file size to facilitate downstream to implement a rolling file
writer.
##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -480,17 +481,24 @@ class FileWriterImpl : public FileWriter {
return Status::OK();
};
+ const int64_t max_row_group_length =
this->properties().max_row_group_length();
+ const int64_t max_row_group_bytes =
this->properties().max_row_group_bytes();
+
int64_t offset = 0;
while (offset < batch.num_rows()) {
- const int64_t batch_size =
- std::min(max_row_group_length - row_group_writer_->num_rows(),
- batch.num_rows() - offset);
- RETURN_NOT_OK(WriteBatch(offset, batch_size));
- offset += batch_size;
-
- // Flush current row group writer and create a new writer if it is full.
- if (row_group_writer_->num_rows() >= max_row_group_length &&
- offset < batch.num_rows()) {
+ int64_t batch_size = std::min(max_row_group_length -
row_group_writer_->num_rows(),
+ batch.num_rows() - offset);
+ if (auto avg_row_size = EstimateCompressedBytesPerRow()) {
+ int64_t buffered_bytes =
row_group_writer_->EstimatedTotalCompressedBytes();
+ batch_size = std::min(
+ batch_size, static_cast<int64_t>((max_row_group_bytes -
buffered_bytes) /
+ avg_row_size.value()));
+ }
+ if (batch_size > 0) {
+ RETURN_NOT_OK(WriteBatch(offset, batch_size));
+ offset += batch_size;
+ } else if (offset < batch.num_rows()) {
+ // Current row group is full, write remaining rows in a new group.
Review Comment:
We cannot accept infinite loop so perhaps we have to set the minimum batch
size to 1 in this case?
##########
cpp/src/parquet/file_writer.h:
##########
@@ -151,6 +156,7 @@ class PARQUET_EXPORT ParquetFileWriter {
virtual RowGroupWriter* AppendBufferedRowGroup() = 0;
virtual int64_t num_rows() const = 0;
+ virtual int64_t written_compressed_bytes() const = 0;
Review Comment:
```suggestion
virtual int64_t written_compressed_bytes() const = 0;
virtual int64_t num_rows() const = 0;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]