wgtmac commented on code in PR #48468:
URL: https://github.com/apache/arrow/pull/48468#discussion_r2685066325


##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -395,15 +395,24 @@ class FileWriterImpl : public FileWriter {
     RETURN_NOT_OK(CheckClosed());
     RETURN_NOT_OK(table.Validate());
 
-    if (chunk_size <= 0 && table.num_rows() > 0) {
-      return Status::Invalid("chunk size per row_group must be greater than 
0");
-    } else if (!table.schema()->Equals(*schema_, false)) {
+    if (!table.schema()->Equals(*schema_, false)) {

Review Comment:
   ```suggestion
       if (!table.schema()->Equals(*schema_, /*check_metadata=*/false)) {
   ```
   
   I know the original line is like this but let's improve readability.



##########
cpp/src/parquet/properties.h:
##########
@@ -414,10 +418,20 @@ class PARQUET_EXPORT WriterProperties {
     /// Specify the max number of rows to put in a single row group.
     /// Default 1Mi rows.
     Builder* max_row_group_length(int64_t max_row_group_length) {
+      ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be 
positive";

Review Comment:
   How about moving its definition to properties.cc so we don't need to include 
logging.h to this file?



##########
cpp/src/parquet/properties.h:
##########
@@ -414,10 +418,20 @@ class PARQUET_EXPORT WriterProperties {
     /// Specify the max number of rows to put in a single row group.
     /// Default 1Mi rows.
     Builder* max_row_group_length(int64_t max_row_group_length) {
+      ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be 
positive";
       max_row_group_length_ = max_row_group_length;
       return this;
     }
 
+    /// Specify the max number of bytes to put in a single row group.
+    /// The size is estimated based on encoded and compressed data.
+    /// Default 128MB.
+    Builder* max_row_group_bytes(int64_t max_row_group_bytes) {
+      ARROW_CHECK_GT(max_row_group_bytes, 0) << "max_row_group_bytes must be 
positive";

Review Comment:
   ditto



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -480,17 +485,28 @@ class FileWriterImpl : public FileWriter {
       return Status::OK();
     };
 
+    // Max number of rows allowed in a row group.
+    const int64_t max_row_group_length = 
this->properties().max_row_group_length();
+    // Max number of bytes allowed in a row group.
+    const int64_t max_row_group_bytes = 
this->properties().max_row_group_bytes();
+
     int64_t offset = 0;
     while (offset < batch.num_rows()) {
-      const int64_t batch_size =
-          std::min(max_row_group_length - row_group_writer_->num_rows(),
-                   batch.num_rows() - offset);
-      RETURN_NOT_OK(WriteBatch(offset, batch_size));
-      offset += batch_size;
-
-      // Flush current row group writer and create a new writer if it is full.
-      if (row_group_writer_->num_rows() >= max_row_group_length &&
-          offset < batch.num_rows()) {
+      int64_t group_rows = row_group_writer_->num_rows();
+      int64_t batch_size =
+          std::min(max_row_group_length - group_rows, batch.num_rows() - 
offset);
+      if (group_rows > 0) {

Review Comment:
   The good thing is that `EstimateCompressedBytesPerRow()` can be called in 
different write functions consistently.



##########
cpp/src/parquet/file_writer.cc:
##########
@@ -68,6 +68,12 @@ int64_t RowGroupWriter::total_compressed_bytes_written() 
const {
   return contents_->total_compressed_bytes_written();
 }
 
+int64_t RowGroupWriter::total_buffered_bytes() const {

Review Comment:
   ```suggestion
   int64_t RowGroupWriter::EstimatedTotalCompressedBytes() const {
   ```
   
   As this is not a trivial getter, we need to use camel-case with capitalized 
initial as well. Add `Estimated` to indicate this is not a precise number and 
remove `buffered` to avoid confusion.



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -395,15 +395,24 @@ class FileWriterImpl : public FileWriter {
     RETURN_NOT_OK(CheckClosed());
     RETURN_NOT_OK(table.Validate());
 
-    if (chunk_size <= 0 && table.num_rows() > 0) {
-      return Status::Invalid("chunk size per row_group must be greater than 
0");
-    } else if (!table.schema()->Equals(*schema_, false)) {
+    if (!table.schema()->Equals(*schema_, false)) {
       return Status::Invalid("table schema does not match this writer's. 
table:'",
                              table.schema()->ToString(), "' this:'", 
schema_->ToString(),
                              "'");
     } else if (chunk_size > this->properties().max_row_group_length()) {
       chunk_size = this->properties().max_row_group_length();
     }
+    // max_row_group_bytes is applied only after the row group has accumulated 
data.
+    if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) {

Review Comment:
   I still think this estimation does not help because in most cases 
`WriteTable` will not be used in the buffered mode. See my suggestion in the 
below comment.



##########
cpp/src/parquet/file_writer.h:
##########
@@ -58,6 +58,8 @@ class PARQUET_EXPORT RowGroupWriter {
     virtual int64_t total_compressed_bytes() const = 0;
     /// \brief total compressed bytes written by the page writer
     virtual int64_t total_compressed_bytes_written() const = 0;
+    /// \brief estimated size of the values that are not written to a page yet

Review Comment:
   ```suggestion
       /// \brief estimated bytes of values that are buffered by the page writer
       /// but not written to a page yet
   ```



##########
cpp/src/parquet/file_writer.h:
##########
@@ -99,6 +101,9 @@ class PARQUET_EXPORT RowGroupWriter {
   int64_t total_compressed_bytes() const;
   /// \brief total compressed bytes written by the page writer
   int64_t total_compressed_bytes_written() const;
+  /// \brief including compressed bytes in page writer and uncompressed data
+  /// value buffer.
+  int64_t total_buffered_bytes() const;

Review Comment:
   ```suggestion
     /// \brief Estimate total compressed bytes including written and buffered 
bytes.
     int64_t EstimatedTotalCompressedBytes() const;
   ```



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -395,15 +395,24 @@ class FileWriterImpl : public FileWriter {
     RETURN_NOT_OK(CheckClosed());
     RETURN_NOT_OK(table.Validate());
 
-    if (chunk_size <= 0 && table.num_rows() > 0) {

Review Comment:
   It seems that you can keep this to fail fast because `chunk_size` deduced 
later will never be negative as you have validated 
`properties().max_row_group_length()`.



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -480,17 +485,28 @@ class FileWriterImpl : public FileWriter {
       return Status::OK();
     };
 
+    // Max number of rows allowed in a row group.
+    const int64_t max_row_group_length = 
this->properties().max_row_group_length();
+    // Max number of bytes allowed in a row group.
+    const int64_t max_row_group_bytes = 
this->properties().max_row_group_bytes();
+
     int64_t offset = 0;
     while (offset < batch.num_rows()) {
-      const int64_t batch_size =
-          std::min(max_row_group_length - row_group_writer_->num_rows(),
-                   batch.num_rows() - offset);
-      RETURN_NOT_OK(WriteBatch(offset, batch_size));
-      offset += batch_size;
-
-      // Flush current row group writer and create a new writer if it is full.
-      if (row_group_writer_->num_rows() >= max_row_group_length &&
-          offset < batch.num_rows()) {
+      int64_t group_rows = row_group_writer_->num_rows();
+      int64_t batch_size =
+          std::min(max_row_group_length - group_rows, batch.num_rows() - 
offset);
+      if (group_rows > 0) {

Review Comment:
   Can we add something like below to estimate the per row size based on 
written row groups if there is any written row group?
   
   ```cpp
   std::optional<double> ParquetFileWriter::EstimateCompressedBytesPerRow() 
const {
     auto estimate_size = [](const FileMetaData& metadata) -> 
std::optional<double> {
       int64_t total_compressed_size = 0;
       int64_t total_rows = 0;
       for (int i = 0; i < metadata.num_row_groups(); i++) {
         total_compressed_size += metadata.RowGroup(i)->total_compressed_size();
         total_rows += metadata.RowGroup(i)->num_rows();
       }
       if (total_compressed_size == 0 || total_rows == 0) {
         return std::nullopt;
       }
       return static_cast<double>(total_compressed_size) / total_rows;
     };
   
     if (contents_) {
       // Use written row groups to estimate.
       return estimate_size(*contents_->metadata());
     }
   
     if (file_metadata_) {
       // Use closed file metadata to estimate.
       return estimate_size(*file_metadata_);
     }
   
     return std::nullopt;
   }
   ```
   
   Then we can add following function to `FileWriterImpl` to adaptively 
estimate the per row size:
   
   ```cpp
     std::optional<double> FileWriterImpl::EstimateCompressedBytesPerRow() 
const {
       if (auto value = writer_->EstimateCompressedBytesPerRow()) {
         return value.value();
       }
       if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) {
         return static_cast<double>(row_group_writer_->total_buffered_bytes()) /
                row_group_writer_->num_rows();
       }
       return std::nullopt;
     }
   ```



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -480,17 +485,28 @@ class FileWriterImpl : public FileWriter {
       return Status::OK();
     };
 
+    // Max number of rows allowed in a row group.
+    const int64_t max_row_group_length = 
this->properties().max_row_group_length();
+    // Max number of bytes allowed in a row group.

Review Comment:
   ```suggestion
       const int64_t max_row_group_length = 
this->properties().max_row_group_length();
   ```
   
   We don't need to add obvious comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to