zhannngchen commented on code in PR #17542:
URL: https://github.com/apache/doris/pull/17542#discussion_r1133534531
##########
be/src/olap/delta_writer.cpp:
##########
@@ -269,10 +273,10 @@ void DeltaWriter::_reset_mem_table() {
_mem_table_tracker.push_back(mem_table_insert_tracker);
_mem_table_tracker.push_back(mem_table_flush_tracker);
}
+ auto mow_context = std::make_shared<MowContext>(_cur_max_version,
_rowset_ids, _delete_bitmap);
Review Comment:
重复创建mow_context?
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -297,11 +306,235 @@ void SegmentWriter::_maybe_invalid_row_cache(const
std::string& key) {
}
}
+void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
+ if (block.rows() == 0) {
+ return;
+ }
+ MonotonicStopWatch watch;
+ watch.start();
+ // find row column id
+ int row_column_id = 0;
+ for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
+ if (_tablet_schema->column(i).is_row_store_column()) {
+ row_column_id = i;
+ break;
+ }
+ }
+ vectorized::ColumnString* row_store_column =
+
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
+
.column->assume_mutable_ref()
+ .assume_mutable()
+ .get());
+ row_store_column->clear();
+ vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block,
*row_store_column,
+
_tablet_schema->num_columns());
+ VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ",
row_column_id:" << row_column_id
+ << ", total_byte_size:" << block.allocated_bytes() << ",
serialize_cost(us)"
+ << watch.elapsed_time() / 1000;
+}
+
+// for partial update, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all key_column's converted
slice
+// 2. get pk of input block, and read missing columns
+// 2.1 first find key location{rowset_id, segment_id, row_id}
+// 2.2 build read plan to read by batch
+// 2.3 fill block
+// 3. set columns to data convertor and then write all columns
+Status SegmentWriter::append_block_with_partial_content(const
vectorized::Block* block,
+ size_t row_pos, size_t
num_rows) {
+ CHECK(block->columns() > _tablet_schema->num_key_columns() &&
+ block->columns() < _tablet_schema->num_columns());
+ CHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
+
+ // find missing column cids
+ std::vector<uint32_t> missing_cids;
+ std::vector<uint32_t> including_cids;
+ for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
+ if (_tablet_schema->is_column_missing(i)) {
+ missing_cids.push_back(i);
+ } else {
+ including_cids.push_back(i);
+ }
+ }
+ // create full block and fill with input columns
+ auto full_block = _tablet_schema->create_block();
+ size_t input_id = 0;
+ for (auto i : including_cids) {
+ full_block.replace_by_position(i,
block->get_by_position(input_id++).column);
+ }
+
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
+
including_cids);
+
+ // write including columns
+ std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+ vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+ for (auto cid : including_cids) {
+ // olap data convertor alway start from id = 0
+ auto converted_result = _olap_data_convertor->convert_column_data(cid);
+ if (converted_result.first != Status::OK()) {
+ return converted_result.first;
+ }
+ if (cid < _num_key_columns) {
+ key_columns.push_back(converted_result.second);
+ } else if (cid == _tablet_schema->sequence_col_idx()) {
+ seq_column = converted_result.second;
+ }
+
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
+
converted_result.second->get_data(),
+ num_rows));
+ }
+
+ bool has_default = false;
+ std::vector<bool> use_default_flag;
+ use_default_flag.reserve(num_rows);
+ for (size_t pos = 0; pos < num_rows; pos++) {
+ std::string key = _full_encode_keys(key_columns, pos);
+ if (_tablet_schema->has_sequence_col()) {
+ _encode_seq_column(seq_column, pos, &key);
+ }
+ RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+ _maybe_invalid_row_cache(key);
+
+ RowLocation loc;
+ // save rowset shared ptr so this rowset wouldn't delete
+ RowsetSharedPtr rowset;
+ auto st = _tablet->lookup_row_key(key, &_mow_context->rowset_ids, &loc,
+ _mow_context->max_version, &rowset);
+ if (st.is<NOT_FOUND>()) {
+ if (!_tablet_schema->allow_key_not_exist_in_partial_update()) {
+ return Status::InternalError("partial update key not exist
before");
+ }
+ has_default = true;
+ use_default_flag.emplace_back(true);
+ continue;
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to lookup row key";
+ return st;
+ }
+ use_default_flag.emplace_back(false);
+ _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
+ _tablet->prepare_to_read(loc, pos, &_rssid_to_rid);
+ _mow_context->delete_bitmap->add({loc.rowset_id, loc.segment_id, 0},
loc.row_id);
+ }
+ CHECK(use_default_flag.size() == num_rows);
+
+ // read and fill block
+ auto mutable_full_columns = full_block.mutate_columns();
+ RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns,
use_default_flag, has_default));
+ // row column should be filled here
+ if (_tablet_schema->store_row_column()) {
+ // convert block to row store format
+ _serialize_block_to_row_column(full_block);
+ }
+
+ // convert missing columns and send to column writer
+ auto cids_missing = _tablet_schema->get_missing_cids();
+
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
+
cids_missing);
+ for (auto cid : cids_missing) {
+ auto converted_result = _olap_data_convertor->convert_column_data(cid);
+ if (converted_result.first != Status::OK()) {
+ return converted_result.first;
+ }
+
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
+
converted_result.second->get_data(),
+ num_rows));
+ }
+
+ _num_rows_written += num_rows;
+ _olap_data_convertor->clear_source_content();
+ return Status::OK();
+}
+
+Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
+ const std::vector<bool>&
use_default_flag,
+ bool has_default) {
+ // create old value columns
+ auto old_value_block = _tablet_schema->create_missing_columns_block();
+ std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
+ CHECK(cids_missing.size() == old_value_block.columns());
+ auto mutable_old_columns = old_value_block.mutate_columns();
+ bool has_row_column = _tablet_schema->store_row_column();
+ // record real pos, key is input line num, value is old_block line num
Review Comment:
This method seems have lots of duplicate code with
`Tablet::read_columns_by_plan`?
##########
be/src/olap/tablet.cpp:
##########
@@ -2590,6 +2703,30 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id,
loc.rowset_id = rowset_id;
loc.segment_id = seg->id();
loc.row_id = row_id;
+ } else if (is_partial_update && rowset_writer != nullptr) {
+ // In publish version, record rows to be deleted for
concurrent update
+ // For example, if version 5 and 6 update a row, but
version 6 only see
+ // version 4 when write, and when publish version,
version 5's value will
+ // be marked as deleted and it's update is losed.
+ // So here we should read version 5's columns and
build a new row, which is
+ // consists of version 6's update columns and version
5's origin columns
+ // here we build 2 read plan for ori values and update
values
+ prepare_to_read(loc, pos, &read_plan_ori);
+ prepare_to_read(RowLocation {rowset_id, seg->id(),
row_id}, pos,
+ &read_plan_update);
+ rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
+ ++pos;
Review Comment:
We can't guarantee that the data written to rowset_writer is sorted, just by
using the `pos` variable.
##########
be/src/olap/tablet.cpp:
##########
@@ -2601,6 +2738,9 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id,
// just set 0 as a unified temporary version number, and
update to
// the real version number later.
delete_bitmap->add({loc.rowset_id, loc.segment_id, 0},
loc.row_id);
+ if (is_partial_update && rowset_writer != nullptr) {
+ delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
Review Comment:
duplicate delete might happen here? any problem?
##########
be/src/olap/tablet.cpp:
##########
@@ -2515,15 +2611,29 @@ Status Tablet::_load_rowset_segments(const
RowsetSharedPtr& rowset,
}
// caller should hold meta_lock
-Status Tablet::calc_delete_bitmap(RowsetId rowset_id,
+Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
const
std::vector<segment_v2::SegmentSharedPtr>& segments,
const RowsetIdUnorderedSet*
specified_rowset_ids,
DeleteBitmapPtr delete_bitmap, int64_t
end_version,
- bool check_pre_segments) {
+ bool check_pre_segments, RowsetWriter*
rowset_writer) {
std::vector<segment_v2::SegmentSharedPtr> pre_segments;
OlapStopWatch watch;
Version dummy_version(end_version + 1, end_version + 1);
+ auto rowset_id = rowset->rowset_id();
+ auto rowset_schema = rowset->tablet_schema();
+ bool is_partial_update = rowset_schema->is_partial_update();
+ // use for partial update
+ std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>
read_plan_ori;
+ std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>
read_plan_update;
+
+ std::map<RowsetId, RowsetSharedPtr> rsid_to_rowset;
+ rsid_to_rowset[rowset_id] = rowset;
+ vectorized::Block block = rowset_schema->create_block();
+ uint32_t pos = 0;
+ StorageReadOptions opt;
+ auto block_row_max = opt.block_row_max;
+
Review Comment:
To avoid duplicate keys, we should check delete bitmap while reading the
primary key index iterator.
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -297,11 +306,235 @@ void SegmentWriter::_maybe_invalid_row_cache(const
std::string& key) {
}
}
+void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
+ if (block.rows() == 0) {
+ return;
+ }
+ MonotonicStopWatch watch;
+ watch.start();
+ // find row column id
+ int row_column_id = 0;
+ for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
+ if (_tablet_schema->column(i).is_row_store_column()) {
+ row_column_id = i;
+ break;
+ }
+ }
+ vectorized::ColumnString* row_store_column =
+
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
+
.column->assume_mutable_ref()
+ .assume_mutable()
+ .get());
+ row_store_column->clear();
+ vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block,
*row_store_column,
+
_tablet_schema->num_columns());
+ VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ",
row_column_id:" << row_column_id
+ << ", total_byte_size:" << block.allocated_bytes() << ",
serialize_cost(us)"
+ << watch.elapsed_time() / 1000;
+}
+
+// for partial update, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all key_column's converted
slice
+// 2. get pk of input block, and read missing columns
+// 2.1 first find key location{rowset_id, segment_id, row_id}
+// 2.2 build read plan to read by batch
+// 2.3 fill block
+// 3. set columns to data convertor and then write all columns
+Status SegmentWriter::append_block_with_partial_content(const
vectorized::Block* block,
+ size_t row_pos, size_t
num_rows) {
+ CHECK(block->columns() > _tablet_schema->num_key_columns() &&
+ block->columns() < _tablet_schema->num_columns());
+ CHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
+
+ // find missing column cids
+ std::vector<uint32_t> missing_cids;
+ std::vector<uint32_t> including_cids;
+ for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
+ if (_tablet_schema->is_column_missing(i)) {
+ missing_cids.push_back(i);
+ } else {
+ including_cids.push_back(i);
+ }
+ }
+ // create full block and fill with input columns
+ auto full_block = _tablet_schema->create_block();
+ size_t input_id = 0;
+ for (auto i : including_cids) {
+ full_block.replace_by_position(i,
block->get_by_position(input_id++).column);
+ }
+
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
+
including_cids);
+
+ // write including columns
+ std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+ vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+ for (auto cid : including_cids) {
+ // olap data convertor alway start from id = 0
+ auto converted_result = _olap_data_convertor->convert_column_data(cid);
+ if (converted_result.first != Status::OK()) {
+ return converted_result.first;
+ }
+ if (cid < _num_key_columns) {
+ key_columns.push_back(converted_result.second);
+ } else if (cid == _tablet_schema->sequence_col_idx()) {
+ seq_column = converted_result.second;
+ }
+
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
+
converted_result.second->get_data(),
+ num_rows));
+ }
+
+ bool has_default = false;
+ std::vector<bool> use_default_flag;
+ use_default_flag.reserve(num_rows);
+ for (size_t pos = 0; pos < num_rows; pos++) {
+ std::string key = _full_encode_keys(key_columns, pos);
+ if (_tablet_schema->has_sequence_col()) {
+ _encode_seq_column(seq_column, pos, &key);
+ }
+ RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+ _maybe_invalid_row_cache(key);
+
+ RowLocation loc;
+ // save rowset shared ptr so this rowset wouldn't delete
+ RowsetSharedPtr rowset;
+ auto st = _tablet->lookup_row_key(key, &_mow_context->rowset_ids, &loc,
+ _mow_context->max_version, &rowset);
+ if (st.is<NOT_FOUND>()) {
+ if (!_tablet_schema->allow_key_not_exist_in_partial_update()) {
+ return Status::InternalError("partial update key not exist
before");
+ }
+ has_default = true;
+ use_default_flag.emplace_back(true);
+ continue;
+ }
+ if (!st.ok()) {
Review Comment:
Should also handle `ALREADY_EXISTS` status?
##########
be/src/olap/tablet.h:
##########
@@ -403,20 +405,45 @@ class Tablet : public BaseTablet {
OlapReaderStatistics& stats, vectorized::Block*
block,
bool write_to_cache = false);
+ Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
+ const std::vector<uint32_t>& rowids,
+ const std::string& column_name,
vectorized::MutableColumnPtr& dst);
+
+ Status fetch_value_through_row_column(RowsetSharedPtr input_rowset,
uint32_t segid,
+ const std::vector<uint32_t>& rowids,
+ const std::vector<uint32_t>& cids,
+ vectorized::Block& block);
+
// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
// finally this rowset publish version with 8, we should make up data
// for rowset 6-7. Also, if a compaction happens between commit_txn and
// publish_txn, we should remove compaction input rowsets' delete_bitmap
// and build newly generated rowset's delete_bitmap
- Status calc_delete_bitmap(RowsetId rowset_id,
+ Status calc_delete_bitmap(RowsetSharedPtr rowset,
const std::vector<segment_v2::SegmentSharedPtr>&
segments,
const RowsetIdUnorderedSet* specified_rowset_ids,
DeleteBitmapPtr delete_bitmap, int64_t version,
- bool check_pre_segments = false);
+ bool check_pre_segments = false,
+ RowsetWriter* rowset_writer = nullptr);
+ Status read_columns_by_plan(
+ TabletSchemaSPtr tablet_schema, const std::vector<uint32_t>
cids_to_read,
+ const std::map<RowsetId, std::map<uint32_t,
std::vector<RidAndPos>>>& read_plan,
+ const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block,
+ std::map<uint32_t, uint32_t>* read_index);
+ void prepare_to_read(const RowLocation& row_location, size_t pos,
+ std::map<RowsetId, std::map<uint32_t,
std::vector<RidAndPos>>>* read_plan);
+ Status generate_new_block_for_partial_update(
+ TabletSchemaSPtr rowset_schema,
+ const std::map<RowsetId, std::map<uint32_t,
std::vector<RidAndPos>>>& read_plan_ori,
Review Comment:
consider to use a type alias for `std::map<RowsetId, std::map<uint32_t,
std::vector<RidAndPos>>>` to make the code clear, e.g.
typedef std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>
ReadPlan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]