This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1715bae26fd [opt](parquet-writer) Specify the row group size when
writing data to Parquet files. (#35081) (#36042)
1715bae26fd is described below
commit 1715bae26fd8f6c24cf929a15f801bd05ac56898
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Jun 7 17:57:11 2024 +0800
[opt](parquet-writer) Specify the row group size when writing data to
Parquet files. (#35081) (#36042)
bp #35081
Co-authored-by: Tiewei Fang <[email protected]>
---
be/src/vec/runtime/vparquet_transformer.cpp | 12 +++++++-----
be/src/vec/runtime/vparquet_transformer.h | 1 +
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
index 8069487c009..77068adeebe 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -70,6 +70,8 @@
namespace doris::vectorized {
+const uint64_t min_row_group_size = 128 * 1024 * 1024; // 128MB
+
ParquetOutputStream::ParquetOutputStream(doris::io::FileWriter* file_writer)
: _file_writer(file_writer), _cur_pos(0), _written_len(0) {
set_mode(arrow::io::FileMode::WRITE);
@@ -292,12 +294,12 @@ Status VParquetTransformer::write(const Block& block) {
RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema,
arrow::default_memory_pool(),
&result, _state->timezone_obj()));
- auto get_table_res = arrow::Table::FromRecordBatches(result->schema(),
{result});
- if (!get_table_res.ok()) {
- return Status::InternalError("Error when get arrow table from record
batchs");
+ RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result));
+ _write_size += block.bytes();
+ if (_write_size >= min_row_group_size) {
+ RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup());
+ _write_size = 0;
}
- auto& table = get_table_res.ValueOrDie();
- RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteTable(*table, block.rows()));
return Status::OK();
}
diff --git a/be/src/vec/runtime/vparquet_transformer.h
b/be/src/vec/runtime/vparquet_transformer.h
index ff1fa73e094..9eae25d8ac4 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -130,6 +130,7 @@ private:
const bool _parquet_disable_dictionary;
const TParquetVersion::type _parquet_version;
const std::string* _iceberg_schema_json;
+ uint64_t _write_size = 0;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]