wgtmac commented on code in PR #36377:
URL: https://github.com/apache/arrow/pull/36377#discussion_r1252544285


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {

Review Comment:
   ```suggestion
   TYPED_TEST(TestBufferedParquetIO, WriteTableSmall) {
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));

Review Comment:
   ```suggestion
     ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, /*num_nulls=*/100, 
kDefaultSeed, &values));
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {

Review Comment:
   ```suggestion
   TYPED_TEST(TestBufferedParquetIO, WriteTableInBatches) {
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batches, but set parquet max-row-group size 
smaller

Review Comment:
   ```suggestion
     // Write all table in one batch with a small max_row_group_size.
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -4980,6 +4980,40 @@ class TestBufferedParquetIO : public 
TestParquetIO<TestType> {
     ASSERT_OK_NO_THROW(writer->Close());
   }
 
+  void WriteBufferedTable(const std::shared_ptr<Array>& values,
+                          int64_t write_table_batch_size,
+                          int64_t parquet_writer_max_row_group_size,
+                          int64_t write_table_max_row_group_size, int* 
num_row_groups) {
+    std::shared_ptr<GroupNode> schema =
+        MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
+    SchemaDescriptor descriptor;
+    ASSERT_NO_THROW(descriptor.Init(schema));
+    std::shared_ptr<::arrow::Schema> arrow_schema;
+    ArrowReaderProperties props;
+    ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
+
+    parquet::WriterProperties::Builder props_builder;
+    props_builder.max_row_group_length(parquet_writer_max_row_group_size);
+
+    this->sink_ = CreateOutputStream();
+    auto low_level_writer =
+        ParquetFileWriter::Open(this->sink_, schema, props_builder.build());
+    std::unique_ptr<FileWriter> writer;
+    ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+                                        std::move(low_level_writer), 
arrow_schema,
+                                        default_arrow_writer_properties(), 
&writer));
+    for (int i = 0; i * write_table_batch_size < values->length(); i++) {

Review Comment:
   Add a DCHECK for `values->length() % write_table_batch_size == 0`?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;

Review Comment:
   constexpr int num_batches = 4;



##########
cpp/src/parquet/arrow/writer.h:
##########
@@ -91,10 +91,21 @@ class PARQUET_EXPORT FileWriter {
 
   /// \brief Write a Table to Parquet.
   ///
+  /// If `use_buffering` is false, then any pending row group is closed
+  /// at the beginning and at the end of this call.
+  /// If `use_buffering` is true, this function reuses an existing
+  /// buffered row group until the chunk size is met, and leaves
+  /// the last row group open for further writes.
+  /// It is recommended to set `use_buffering` to true to minimize
+  /// the number of row groups, especially when calling `WriteTable`
+  /// with small tables.
+  ///
   /// \param table Arrow table to write.
   /// \param chunk_size maximum number of rows to write per row group.
-  virtual ::arrow::Status WriteTable(
-      const ::arrow::Table& table, int64_t chunk_size = 
DEFAULT_MAX_ROW_GROUP_LENGTH) = 0;
+  /// \param use_buffering Whether to potentially buffer data.
+  virtual ::arrow::Status WriteTable(const ::arrow::Table& table,

Review Comment:
   Sorry that I have missed the discussion. Since the conclusion is to extend 
the `WriteTable`, I will resolve this comment.



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batches, but set parquet max-row-group size 
smaller
+  {
+    int64_t write_table_batch_size = SMALL_SIZE;
+    int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4;
+    int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+    this->WriteBufferedTable(values, write_table_batch_size,
+                             parquet_writer_max_row_group_size, 
write_max_row_group_size,
+                             &num_row_groups);
+    EXPECT_EQ(4, num_row_groups);
+    ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+  }
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t parquet_writer_max_row_group_size = SMALL_SIZE;
+  int64_t write_max_row_group_size = SMALL_SIZE / 4;
+  this->WriteBufferedTable(values, write_table_batch_size,
+                           parquet_writer_max_row_group_size, 
write_max_row_group_size,
+                           &num_row_groups);
+  EXPECT_EQ(4, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatchesLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

Review Comment:
   ```suggestion
     ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, 
kDefaultSeed, &values));
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batches, but set parquet max-row-group size 
smaller
+  {
+    int64_t write_table_batch_size = SMALL_SIZE;
+    int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4;
+    int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+    this->WriteBufferedTable(values, write_table_batch_size,
+                             parquet_writer_max_row_group_size, 
write_max_row_group_size,
+                             &num_row_groups);
+    EXPECT_EQ(4, num_row_groups);
+    ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+  }
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t parquet_writer_max_row_group_size = SMALL_SIZE;
+  int64_t write_max_row_group_size = SMALL_SIZE / 4;
+  this->WriteBufferedTable(values, write_table_batch_size,
+                           parquet_writer_max_row_group_size, 
write_max_row_group_size,
+                           &num_row_groups);
+  EXPECT_EQ(4, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatchesLimit) {

Review Comment:
   ```suggestion
   TYPED_TEST(TestBufferedParquetIO, WriteTableInBatchesForMultiRowGroups) {
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

Review Comment:
   ```suggestion
     ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, 
kDefaultSeed, &values));
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batches, but set parquet max-row-group size 
smaller
+  {
+    int64_t write_table_batch_size = SMALL_SIZE;
+    int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4;
+    int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+    this->WriteBufferedTable(values, write_table_batch_size,
+                             parquet_writer_max_row_group_size, 
write_max_row_group_size,
+                             &num_row_groups);
+    EXPECT_EQ(4, num_row_groups);
+    ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+  }
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

Review Comment:
   ```suggestion
     ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, 
kDefaultSeed, &values));
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batches, but set parquet max-row-group size 
smaller
+  {
+    int64_t write_table_batch_size = SMALL_SIZE;
+    int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4;

Review Comment:
   Ditto for the magic number 4



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

Review Comment:
   ```suggestion
     ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, 
kDefaultSeed, &values));
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {

Review Comment:
   ```suggestion
   TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit) {
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -4980,6 +4980,40 @@ class TestBufferedParquetIO : public 
TestParquetIO<TestType> {
     ASSERT_OK_NO_THROW(writer->Close());
   }
 
+  void WriteBufferedTable(const std::shared_ptr<Array>& values,
+                          int64_t write_table_batch_size,
+                          int64_t parquet_writer_max_row_group_size,

Review Comment:
   ```suggestion
                             int64_t writer_properties_max_row_group_size,
   ```



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -406,57 +402,7 @@ class FileWriterImpl : public FileWriter {
     }
 
     // Max number of rows allowed in a row group.
-    const int64_t max_row_group_length = 
this->properties().max_row_group_length();
-
-    if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
-        row_group_writer_->num_rows() >= max_row_group_length) {
-      RETURN_NOT_OK(NewBufferedRowGroup());
-    }
-
-    auto WriteBatch = [&](int64_t offset, int64_t size) {
-      std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers;
-      int column_index_start = 0;
-
-      for (int i = 0; i < batch.num_columns(); i++) {
-        ChunkedArray chunked_array{batch.column(i)};
-        ARROW_ASSIGN_OR_RAISE(
-            std::unique_ptr<ArrowColumnWriterV2> writer,
-            ArrowColumnWriterV2::Make(chunked_array, offset, size, 
schema_manifest_,
-                                      row_group_writer_, column_index_start));
-        column_index_start += writer->leaf_count();
-        if (arrow_properties_->use_threads()) {
-          writers.emplace_back(std::move(writer));
-        } else {
-          RETURN_NOT_OK(writer->Write(&column_write_context_));
-        }
-      }
-
-      if (arrow_properties_->use_threads()) {

Review Comment:
   You may also add similar comment to WriteTable
   
   ```cpp
     /// WARNING: If you are writing multiple files in parallel in the same
     /// executor, deadlock may occur if ArrowWriterProperties::use_threads
     /// is set to true to write columns in parallel. Please disable use_threads
     /// option in this case.
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5030,6 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, 
SingleColumnOptionalBufferedWriteLarge) {
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
 }
 
+TYPED_TEST(TestBufferedParquetIO, WriteTableBase) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = SMALL_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batch.
+  int64_t write_table_batch_size = LARGE_SIZE;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with four batches.
+  int64_t write_table_batch_size = SMALL_SIZE / 4;
+  int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+  this->WriteBufferedTable(values, write_table_batch_size, 
write_table_max_row_group_size,
+                           write_max_row_group_size, &num_row_groups);
+  EXPECT_EQ(1, num_row_groups);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  int num_row_groups = 0;
+  // Write all table with one batches, but set parquet max-row-group size 
smaller
+  {
+    int64_t write_table_batch_size = SMALL_SIZE;
+    int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4;
+    int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
+    this->WriteBufferedTable(values, write_table_batch_size,
+                             parquet_writer_max_row_group_size, 
write_max_row_group_size,
+                             &num_row_groups);
+    EXPECT_EQ(4, num_row_groups);
+    ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, 
num_row_groups));
+  }
+}
+
+TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) {

Review Comment:
   ```suggestion
   TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit2) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to