This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3fe11401ec8 branch-4.0: [opt](memory) optimize row-store memtable 
flush memory in the row-store scenario (#64056)
3fe11401ec8 is described below

commit 3fe11401ec878bb09531f78731dd8af2c98cd185
Author: hui lai <[email protected]>
AuthorDate: Thu Jun 11 11:15:37 2026 +0800

    branch-4.0: [opt](memory) optimize row-store memtable flush memory in the 
row-store scenario (#64056)
    
    pick https://github.com/apache/doris/pull/63342
---
 .../rowset/segment_v2/vertical_segment_writer.cpp  | 122 ++++++++++++++-------
 .../rowset/segment_v2/vertical_segment_writer.h    |   4 +-
 be/src/vec/jsonb/serialize.cpp                     |  28 ++++-
 be/src/vec/jsonb/serialize.h                       |   8 ++
 4 files changed, 120 insertions(+), 42 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 73e7d54cf6a..7ca12cbfa9b 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -22,6 +22,7 @@
 #include <gen_cpp/segment_v2.pb.h>
 #include <parallel_hashmap/phmap.h>
 
+#include <algorithm>
 #include <cassert>
 #include <memory>
 #include <ostream>
@@ -379,32 +380,44 @@ void 
VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key) con
     }
 }
 
-void VerticalSegmentWriter::_serialize_block_to_row_column(const 
vectorized::Block& block) {
-    if (block.rows() == 0) {
-        return;
+Status VerticalSegmentWriter::_append_row_store_column(const 
vectorized::Block& block,
+                                                       size_t row_pos, size_t 
num_rows,
+                                                       uint32_t cid) {
+    DCHECK(_tablet_schema->column(cid).is_row_store_column());
+    if (num_rows == 0) {
+        return Status::OK();
     }
-    MonotonicStopWatch watch;
-    watch.start();
-    int row_column_id = 0;
-    for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
-        if (_tablet_schema->column(i).is_row_store_column()) {
-            auto* row_store_column = static_cast<vectorized::ColumnString*>(
-                    
block.get_by_position(i).column->assume_mutable_ref().assume_mutable().get());
-            row_store_column->clear();
-            vectorized::DataTypeSerDeSPtrs serdes =
-                    
vectorized::create_data_type_serdes(block.get_data_types());
-            std::unordered_set<int> 
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
-                                                       
_tablet_schema->row_columns_uids().end());
-            vectorized::JsonbSerializeUtil::block_to_jsonb(
-                    *_tablet_schema, block, *row_store_column,
-                    cast_set<int>(_tablet_schema->num_columns()), serdes, 
row_store_cids_set);
-            break;
-        }
+    DCHECK_LE(row_pos + num_rows, block.rows());
+
+    auto serdes = vectorized::create_data_type_serdes(block.get_data_types());
+    std::unordered_set<int32_t> 
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
+                                                   
_tablet_schema->row_columns_uids().end());
+    size_t end_pos = row_pos + num_rows;
+    size_t batch_rows = _opts.num_rows_per_block;
+    static constexpr size_t kRowStoreBatchBytes = 4 * 1024 * 1024;
+    DCHECK_GT(batch_rows, 0);
+    for (size_t pos = row_pos; pos < end_pos;) {
+        size_t max_rows = std::min(batch_rows, end_pos - pos);
+        auto row_column = vectorized::ColumnString::create();
+        auto* row_store_column = row_column.get();
+        size_t rows = vectorized::JsonbSerializeUtil::block_to_jsonb(
+                *_tablet_schema, block, *row_store_column,
+                cast_set<int>(_tablet_schema->num_columns()), serdes, 
row_store_cids_set, pos,
+                max_rows, kRowStoreBatchBytes);
+        DCHECK_GT(rows, 0);
+
+        auto typed_column = block.get_by_position(cid);
+        typed_column.column = std::move(row_column);
+        
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                typed_column, 0, rows, cid));
+        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+        RETURN_IF_ERROR(status);
+        RETURN_IF_ERROR(
+                _column_writers[cid]->append(column->get_nullmap(), 
column->get_data(), rows));
+        _olap_data_convertor->clear_source_content(cid);
+        pos += rows;
     }
-
-    VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", 
row_column_id:" << row_column_id
-               << ", total_byte_size:" << block.allocated_bytes() << ", 
serialize_cost(us)"
-               << watch.elapsed_time() / 1000;
+    return Status::OK();
 }
 
 Status VerticalSegmentWriter::_probe_key_for_mow(
@@ -472,6 +485,15 @@ Status VerticalSegmentWriter::_probe_key_for_mow(
     return Status::OK();
 }
 
+Status VerticalSegmentWriter::_check_column_writer_disk_capacity(size_t cid) {
+    if (_data_dir != nullptr &&
+        
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
+        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
+                                                        
_data_dir->path_hash());
+    }
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::_finalize_column_writer_and_update_meta(size_t 
cid) {
     RETURN_IF_ERROR(_column_writers[cid]->finish());
     RETURN_IF_ERROR(_column_writers[cid]->write_data());
@@ -658,14 +680,15 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
         RETURN_IF_ERROR(parse_variant_columns_in_block(full_block, 
*_tablet_schema));
     }
 
-    // row column should be filled here
-    // convert block to row store format
-    _serialize_block_to_row_column(full_block);
-
     // convert missing columns and send to column writer
     const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
     for (auto cid : missing_cids) {
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
+        if (_tablet_schema->column(cid).is_row_store_column()) {
+            RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, 
data.num_rows, cid));
+            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+            continue;
+        }
         
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
                 &full_block, data.row_pos, data.num_rows, 
std::vector<uint32_t> {cid}));
         auto [status, column] = _olap_data_convertor->convert_column_data(cid);
@@ -824,10 +847,22 @@ Status 
VerticalSegmentWriter::_append_block_with_flexible_partial_content(
     // this column is not needed in read path for merge-on-write table
 
     // 7. fill row store column
-    _serialize_block_to_row_column(full_block);
+    for (auto cid = _tablet_schema->num_key_columns(); cid < 
_tablet_schema->num_columns(); cid++) {
+        if (!_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
+        RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid), 
_tablet_schema->column(cid),
+                                              _tablet_schema));
+        RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, 
data.num_rows,
+                                                 cast_set<uint32_t>(cid)));
+        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+    }
 
     // 8. encode and write all non-primary key columns(including sequence 
column if exists)
     for (auto cid = _tablet_schema->num_key_columns(); cid < 
_tablet_schema->num_columns(); cid++) {
+        if (_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
         if (cid != _tablet_schema->sequence_col_idx()) {
             RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid),
                                                   _tablet_schema->column(cid), 
_tablet_schema));
@@ -1009,11 +1044,21 @@ Status VerticalSegmentWriter::write_batch() {
     }
     // Row column should be filled here when it's a directly write from 
memtable
     // or it's schema change write(since column data type maybe changed, so we 
should reubild)
-    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
-        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
-        for (auto& data : _batched_blocks) {
-            // TODO: maybe we should pass range to this method
-            _serialize_block_to_row_column(*data.block);
+    bool should_write_row_store_column = _opts.write_type == 
DataWriteType::TYPE_DIRECT ||
+                                         _opts.write_type == 
DataWriteType::TYPE_SCHEMA_CHANGE;
+    if (should_write_row_store_column) {
+        for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+            if (!_tablet_schema->column(cid).is_row_store_column()) {
+                continue;
+            }
+            RETURN_IF_ERROR(
+                    _create_column_writer(cid, _tablet_schema->column(cid), 
_tablet_schema));
+            for (auto& data : _batched_blocks) {
+                RETURN_IF_ERROR(
+                        _append_row_store_column(*data.block, data.row_pos, 
data.num_rows, cid));
+            }
+            RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
+            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
         }
     }
 
@@ -1022,6 +1067,9 @@ Status VerticalSegmentWriter::write_batch() {
     // the key is cluster key column unique id
     std::map<uint32_t, vectorized::IOlapColumnDataAccessor*> cid_to_column;
     for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+        if (should_write_row_store_column && 
_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
         for (auto& data : _batched_blocks) {
             
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
@@ -1049,11 +1097,7 @@ Status VerticalSegmentWriter::write_batch() {
                                                          data.num_rows));
             _olap_data_convertor->clear_source_content();
         }
-        if (_data_dir != nullptr &&
-            
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
-            return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
-                                                            
_data_dir->path_hash());
-        }
+        RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
         RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
     }
 
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index 51dfd217bfc..fbd9125abe5 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -161,7 +161,8 @@ private:
     void _set_min_max_key(const Slice& key);
     void _set_min_key(const Slice& key);
     void _set_max_key(const Slice& key);
-    void _serialize_block_to_row_column(const vectorized::Block& block);
+    Status _append_row_store_column(const vectorized::Block& block, size_t 
row_pos, size_t num_rows,
+                                    uint32_t cid);
     Status _probe_key_for_mow(std::string key, std::size_t segment_pos, bool 
have_input_seq_column,
                               bool have_delete_sign,
                               const std::vector<RowsetSharedPtr>& 
specified_rowsets,
@@ -198,6 +199,7 @@ private:
             vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, 
bool need_sort);
     Status 
_generate_short_key_index(std::vector<vectorized::IOlapColumnDataAccessor*>& 
key_columns,
                                      size_t num_rows, const 
std::vector<size_t>& short_key_pos);
+    Status _check_column_writer_disk_capacity(size_t cid);
     Status _finalize_column_writer_and_update_meta(size_t cid);
 
     bool _is_mow();
diff --git a/be/src/vec/jsonb/serialize.cpp b/be/src/vec/jsonb/serialize.cpp
index c31918c5409..848a0404172 100644
--- a/be/src/vec/jsonb/serialize.cpp
+++ b/be/src/vec/jsonb/serialize.cpp
@@ -20,6 +20,7 @@
 #include <assert.h>
 
 #include <algorithm>
+#include <limits>
 #include <memory>
 #include <unordered_set>
 #include <vector>
@@ -47,13 +48,31 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& 
schema, const Block&
                                         ColumnString& dst, int num_cols,
                                         const DataTypeSerDeSPtrs& serdes,
                                         const std::unordered_set<int32_t>& 
row_store_cids) {
-    auto num_rows = block.rows();
+    block_to_jsonb(schema, block, dst, num_cols, serdes, row_store_cids, 0, 
block.rows());
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                        ColumnString& dst, int num_cols,
+                                        const DataTypeSerDeSPtrs& serdes,
+                                        const std::unordered_set<int32_t>& 
row_store_cids,
+                                        size_t row_pos, size_t num_rows) {
+    static_cast<void>(block_to_jsonb(schema, block, dst, num_cols, serdes, 
row_store_cids, row_pos,
+                                     num_rows, 
std::numeric_limits<size_t>::max()));
+}
+
+size_t JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                          ColumnString& dst, int num_cols,
+                                          const DataTypeSerDeSPtrs& serdes,
+                                          const std::unordered_set<int32_t>& 
row_store_cids,
+                                          size_t row_pos, size_t num_rows, 
size_t max_bytes) {
     Arena arena;
     assert(num_cols <= block.columns());
+    assert(row_pos + num_rows <= block.rows());
     DataTypeSerDe::FormatOptions options;
     auto tz = cctz::utc_time_zone();
     options.timezone = &tz;
-    for (int i = 0; i < num_rows; ++i) {
+    size_t written_rows = 0;
+    for (size_t i = row_pos; i < row_pos + num_rows; ++i) {
         JsonbWriterT<JsonbOutStream> jsonb_writer;
         jsonb_writer.writeStartObject();
         for (int j = 0; j < num_cols; ++j) {
@@ -71,7 +90,12 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& 
schema, const Block&
         }
         jsonb_writer.writeEndObject();
         dst.insert_data(jsonb_writer.getOutput()->getBuffer(), 
jsonb_writer.getOutput()->getSize());
+        ++written_rows;
+        if (dst.byte_size() >= max_bytes) {
+            break;
+        }
     }
+    return written_rows;
 }
 
 // batch rows
diff --git a/be/src/vec/jsonb/serialize.h b/be/src/vec/jsonb/serialize.h
index b8f1a0a25fa..d1fc3163791 100644
--- a/be/src/vec/jsonb/serialize.h
+++ b/be/src/vec/jsonb/serialize.h
@@ -41,6 +41,14 @@ public:
     static void block_to_jsonb(const TabletSchema& schema, const Block& block, 
ColumnString& dst,
                                int num_cols, const DataTypeSerDeSPtrs& serdes,
                                const std::unordered_set<int32_t>& 
row_store_cids);
+    static void block_to_jsonb(const TabletSchema& schema, const Block& block, 
ColumnString& dst,
+                               int num_cols, const DataTypeSerDeSPtrs& serdes,
+                               const std::unordered_set<int32_t>& 
row_store_cids, size_t row_pos,
+                               size_t num_rows);
+    static size_t block_to_jsonb(const TabletSchema& schema, const Block& 
block, ColumnString& dst,
+                                 int num_cols, const DataTypeSerDeSPtrs& 
serdes,
+                                 const std::unordered_set<int32_t>& 
row_store_cids, size_t row_pos,
+                                 size_t num_rows, size_t max_bytes);
     // batch rows
     static Status jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const 
ColumnString& jsonb_column,
                                  const std::unordered_map<uint32_t, uint32_t>& 
col_id_to_idx,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to