wgtmac commented on code in PR #48468:
URL: https://github.com/apache/arrow/pull/48468#discussion_r2693510467


##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -480,17 +481,24 @@ class FileWriterImpl : public FileWriter {
       return Status::OK();
     };
 
+    const int64_t max_row_group_length = 
this->properties().max_row_group_length();
+    const int64_t max_row_group_bytes = 
this->properties().max_row_group_bytes();
+
     int64_t offset = 0;
     while (offset < batch.num_rows()) {
-      const int64_t batch_size =
-          std::min(max_row_group_length - row_group_writer_->num_rows(),
-                   batch.num_rows() - offset);
-      RETURN_NOT_OK(WriteBatch(offset, batch_size));
-      offset += batch_size;
-
-      // Flush current row group writer and create a new writer if it is full.
-      if (row_group_writer_->num_rows() >= max_row_group_length &&
-          offset < batch.num_rows()) {
+      int64_t batch_size = std::min(max_row_group_length - 
row_group_writer_->num_rows(),
+                                    batch.num_rows() - offset);
+      if (auto avg_row_size = EstimateCompressedBytesPerRow()) {
+        int64_t buffered_bytes = 
row_group_writer_->EstimatedTotalCompressedBytes();
+        batch_size = std::min(
+            batch_size, static_cast<int64_t>((max_row_group_bytes - 
buffered_bytes) /
+                                             avg_row_size.value()));
+      }
+      if (batch_size > 0) {
+        RETURN_NOT_OK(WriteBatch(offset, batch_size));
+        offset += batch_size;
+      } else if (offset < batch.num_rows()) {
+        // Current row group is full, write remaining rows in a new group.

Review Comment:
   Will it cause infinite loop at this line if `batch_size` is always 0?



##########
cpp/src/parquet/file_writer.cc:
##########
@@ -468,6 +493,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
   const std::shared_ptr<WriterProperties> properties_;
   int num_row_groups_;
   int64_t num_rows_;
+  int64_t compressed_bytes_;

Review Comment:
   Perhaps rename to `written_row_group_compressed_bytes_` to be more clear? Or 
`written_compressed_bytes_` if previous one is too long.



##########
cpp/src/parquet/arrow/writer.h:
##########
@@ -139,6 +139,9 @@ class PARQUET_EXPORT FileWriter {
   /// `store_schema` being unusable during read.
   virtual ::arrow::Status AddKeyValueMetadata(
       const std::shared_ptr<const ::arrow::KeyValueMetadata>& 
key_value_metadata) = 0;
+  /// \brief Estimate compressed bytes per row from closed row groups or the 
active row
+  /// group.

Review Comment:
   ```suggestion
     /// \brief Estimate compressed bytes per row from data written so far.
     /// \note std::nullopt will be returned if there is no row written.
   ```



##########
cpp/src/parquet/file_writer.h:
##########
@@ -151,6 +156,7 @@ class PARQUET_EXPORT ParquetFileWriter {
     virtual RowGroupWriter* AppendBufferedRowGroup() = 0;
 
     virtual int64_t num_rows() const = 0;
+    virtual int64_t compressed_bytes() const = 0;

Review Comment:
   ```suggestion
       virtual int64_t compressed_bytes() const = 0;
       virtual int64_t num_rows() const = 0;
   ```
   
   This order looks more natural :)



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -396,14 +396,19 @@ class FileWriterImpl : public FileWriter {
     RETURN_NOT_OK(table.Validate());
 
     if (chunk_size <= 0 && table.num_rows() > 0) {
-      return Status::Invalid("chunk size per row_group must be greater than 
0");
-    } else if (!table.schema()->Equals(*schema_, false)) {
+      return Status::Invalid("rows per row_group must be greater than 0");
+    } else if (!table.schema()->Equals(*schema_, /*check_metadata=*/false)) {
       return Status::Invalid("table schema does not match this writer's. 
table:'",
                              table.schema()->ToString(), "' this:'", 
schema_->ToString(),
                              "'");
     } else if (chunk_size > this->properties().max_row_group_length()) {
       chunk_size = this->properties().max_row_group_length();
     }
+    if (auto avg_row_size = EstimateCompressedBytesPerRow()) {
+      chunk_size = std::min(
+          chunk_size, 
static_cast<int64_t>(this->properties().max_row_group_bytes() /
+                                           avg_row_size.value()));

Review Comment:
   We need to clamp the chunk size between 1 and 
`max_row_group_bytes/avg_row_size`.



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -396,14 +396,19 @@ class FileWriterImpl : public FileWriter {
     RETURN_NOT_OK(table.Validate());
 
     if (chunk_size <= 0 && table.num_rows() > 0) {
-      return Status::Invalid("chunk size per row_group must be greater than 
0");
-    } else if (!table.schema()->Equals(*schema_, false)) {
+      return Status::Invalid("rows per row_group must be greater than 0");

Review Comment:
   ```suggestion
         return Status::Invalid("chunk size per row_group must be greater than 
0");
   ```



##########
cpp/src/parquet/file_writer.h:
##########
@@ -207,6 +213,9 @@ class PARQUET_EXPORT ParquetFileWriter {
   void AddKeyValueMetadata(
       const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
 
+  /// Estimate compressed bytes per row from closed row groups.

Review Comment:
   ```suggestion
     /// \brief Estimate compressed bytes per row from closed row groups.
     /// \return Estimated bytes or std::nullopt when no written row group.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to