github-actions[bot] commented on code in PR #22504:
URL: https://github.com/apache/doris/pull/22504#discussion_r1333846546
##########
be/src/olap/rowset/beta_rowset_writer_v2.h:
##########
@@ -120,6 +120,11 @@ class BetaRowsetWriterV2 : public RowsetWriter {
return Status::OK();
}
+ // currently, a rowset cantains at most one segment, so we just return the
segment's indicator maps
+ std::shared_ptr<IndicatorMaps> get_indicator_maps() const override {
Review Comment:
warning: function 'get_indicator_maps' should be marked [[nodiscard]]
[modernize-use-nodiscard]
```suggestion
[[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const
override {
```
##########
be/src/exec/tablet_info.h:
##########
@@ -90,6 +90,7 @@ class OlapTableSchemaParam {
return _partial_update_input_columns;
}
bool is_strict_mode() const { return _is_strict_mode; }
+ bool is_unique_key_replace_if_not_null() const { return
_is_unique_key_replace_if_not_null; }
Review Comment:
warning: function 'is_unique_key_replace_if_not_null' should be marked
[[nodiscard]] [modernize-use-nodiscard]
```suggestion
[[nodiscard]] bool is_unique_key_replace_if_not_null() const { return
_is_unique_key_replace_if_not_null; }
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
return Status::OK();
}
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
- const std::vector<bool>&
use_default_or_null_flag,
- bool has_default_or_nullable,
- const size_t& segment_start_pos) {
- // create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
- CHECK(cids_missing.size() == old_value_block.columns());
- auto mutable_old_columns = old_value_block.mutate_columns();
- bool has_row_column = _tablet_schema->store_row_column();
- // record real pos, key is input line num, value is old_block line num
- std::map<uint32_t, uint32_t> read_index;
- size_t read_idx = 0;
- for (auto rs_it : _rssid_to_rid) {
- for (auto seg_it : rs_it.second) {
- auto rowset = _rsid_to_rowset[rs_it.first];
- CHECK(rowset);
- std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
- }
- if (has_row_column) {
- auto st = _tablet->fetch_value_through_row_column(rowset,
seg_it.first, rids,
-
cids_missing, old_value_block);
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value through row column";
- return st;
- }
- continue;
- }
- for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
- TabletColumn tablet_column =
_tablet_schema->column(cids_missing[cid]);
- auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids, tablet_column,
-
mutable_old_columns[cid]);
- // set read value to output block
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value by rowids";
- return st;
- }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+ const IndicatorMapsVertical&
indicator_maps_vertical) {
+ _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); //
fixme(baohan): ?
+ for (auto [cid, indicator_map] : indicator_maps_vertical) {
+ for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+ if (indicator_map != nullptr && indicator_map[pos] != 0) {
+ (*_indicator_maps)[pos].emplace_back(cid);
}
}
}
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3,
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |1 |1 |1 |1 |1
+// 2 |2 |2 |2 |2 |2 |2
+// 3 |3 |3 |3 |3 |3 |3
+// 4 |4 |4 |4 |4 |4 |4
+// 5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num k1|k2|v1|v2|v3
+// 1 1 |1 |10|10|10
+// 2 2 |2 |? |20|20
+// 3 3 |3 |30|30|?
+// 4 4 |4 |40|40|40
+// 5 5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+// old_full_read_columns = [v4, v5], the old values from the previous
rows will be read into these columns.
+// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator
value.
+// Becase the column is immutable, filled_including_value_columns will store
the data merged from
+// the original input block and old_point_read_columns. After the insertion,
the data in the table will be:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |10|10|10|1 |1
+// 2 |2 |2 |20|20|2 |2
+// 3 |3 |30|30|3 |3 |3
+// 4 |4 |40|40|40|4 |4
+// 5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+ vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+ const std::vector<uint32_t>& cids_full_read, const
std::vector<uint32_t>& cids_point_read,
+ const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
+ const size_t& segment_start_pos, bool
is_unique_key_replace_if_not_null) {
+ vectorized::MutableColumns full_columns = full_block->mutate_columns();
+ vectorized::Block old_full_read_block =
_tablet_schema->create_missing_columns_block();
+ vectorized::MutableColumns old_full_read_columns =
old_full_read_block.mutate_columns();
Review Comment:
warning: variable 'old_full_read_columns' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
vectorized::MutableColumns old_full_read_columns = 0 =
old_full_read_block.mutate_columns();
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
return Status::OK();
}
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
- const std::vector<bool>&
use_default_or_null_flag,
- bool has_default_or_nullable,
- const size_t& segment_start_pos) {
- // create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
- CHECK(cids_missing.size() == old_value_block.columns());
- auto mutable_old_columns = old_value_block.mutate_columns();
- bool has_row_column = _tablet_schema->store_row_column();
- // record real pos, key is input line num, value is old_block line num
- std::map<uint32_t, uint32_t> read_index;
- size_t read_idx = 0;
- for (auto rs_it : _rssid_to_rid) {
- for (auto seg_it : rs_it.second) {
- auto rowset = _rsid_to_rowset[rs_it.first];
- CHECK(rowset);
- std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
- }
- if (has_row_column) {
- auto st = _tablet->fetch_value_through_row_column(rowset,
seg_it.first, rids,
-
cids_missing, old_value_block);
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value through row column";
- return st;
- }
- continue;
- }
- for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
- TabletColumn tablet_column =
_tablet_schema->column(cids_missing[cid]);
- auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids, tablet_column,
-
mutable_old_columns[cid]);
- // set read value to output block
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value by rowids";
- return st;
- }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+ const IndicatorMapsVertical&
indicator_maps_vertical) {
+ _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); //
fixme(baohan): ?
+ for (auto [cid, indicator_map] : indicator_maps_vertical) {
+ for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+ if (indicator_map != nullptr && indicator_map[pos] != 0) {
+ (*_indicator_maps)[pos].emplace_back(cid);
}
}
}
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3,
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |1 |1 |1 |1 |1
+// 2 |2 |2 |2 |2 |2 |2
+// 3 |3 |3 |3 |3 |3 |3
+// 4 |4 |4 |4 |4 |4 |4
+// 5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num k1|k2|v1|v2|v3
+// 1 1 |1 |10|10|10
+// 2 2 |2 |? |20|20
+// 3 3 |3 |30|30|?
+// 4 4 |4 |40|40|40
+// 5 5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+// old_full_read_columns = [v4, v5], the old values from the previous
rows will be read into these columns.
+// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator
value.
+// Becase the column is immutable, filled_including_value_columns will store
the data merged from
+// the original input block and old_point_read_columns. After the insertion,
the data in the table will be:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |10|10|10|1 |1
+// 2 |2 |2 |20|20|2 |2
+// 3 |3 |30|30|3 |3 |3
+// 4 |4 |40|40|40|4 |4
+// 5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+ vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+ const std::vector<uint32_t>& cids_full_read, const
std::vector<uint32_t>& cids_point_read,
+ const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
+ const size_t& segment_start_pos, bool
is_unique_key_replace_if_not_null) {
+ vectorized::MutableColumns full_columns = full_block->mutate_columns();
+ vectorized::Block old_full_read_block =
_tablet_schema->create_missing_columns_block();
+ vectorized::MutableColumns old_full_read_columns =
old_full_read_block.mutate_columns();
+ vectorized::Block old_point_read_block =
_tablet_schema->create_block_by_cids(cids_point_read);
+ vectorized::MutableColumns old_point_read_columns =
old_point_read_block.mutate_columns();
Review Comment:
warning: variable 'old_point_read_columns' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
vectorized::MutableColumns old_point_read_columns = 0 =
old_point_read_block.mutate_columns();
```
##########
be/src/olap/rowset/segment_v2/segment_writer.h:
##########
@@ -128,9 +130,15 @@ class SegmentWriter {
void clear();
void set_mow_context(std::shared_ptr<MowContext> mow_context);
- Status fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
+ Status fill_missing_columns(vectorized::Block* full_block,
+ const PartialUpdateReadPlan& read_plan,
+ const std::vector<uint32_t>& cids_full_read,
+ const std::vector<uint32_t>& cids_point_read,
const std::vector<bool>&
use_default_or_null_flag,
- bool has_default_or_nullable, const size_t&
segment_start_pos);
+ bool has_default_or_nullable, const size_t&
segment_start_pos,
+ bool is_unique_key_replace_if_not_null);
+
+ std::shared_ptr<IndicatorMaps> get_indicator_maps() const { return
_indicator_maps; }
Review Comment:
warning: function 'get_indicator_maps' should be marked [[nodiscard]]
[modernize-use-nodiscard]
```suggestion
[[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const
{ return _indicator_maps; }
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
return Status::OK();
}
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
- const std::vector<bool>&
use_default_or_null_flag,
- bool has_default_or_nullable,
- const size_t& segment_start_pos) {
- // create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
- CHECK(cids_missing.size() == old_value_block.columns());
- auto mutable_old_columns = old_value_block.mutate_columns();
- bool has_row_column = _tablet_schema->store_row_column();
- // record real pos, key is input line num, value is old_block line num
- std::map<uint32_t, uint32_t> read_index;
- size_t read_idx = 0;
- for (auto rs_it : _rssid_to_rid) {
- for (auto seg_it : rs_it.second) {
- auto rowset = _rsid_to_rowset[rs_it.first];
- CHECK(rowset);
- std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
- }
- if (has_row_column) {
- auto st = _tablet->fetch_value_through_row_column(rowset,
seg_it.first, rids,
-
cids_missing, old_value_block);
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value through row column";
- return st;
- }
- continue;
- }
- for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
- TabletColumn tablet_column =
_tablet_schema->column(cids_missing[cid]);
- auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids, tablet_column,
-
mutable_old_columns[cid]);
- // set read value to output block
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value by rowids";
- return st;
- }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+ const IndicatorMapsVertical&
indicator_maps_vertical) {
+ _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); //
fixme(baohan): ?
+ for (auto [cid, indicator_map] : indicator_maps_vertical) {
+ for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+ if (indicator_map != nullptr && indicator_map[pos] != 0) {
+ (*_indicator_maps)[pos].emplace_back(cid);
}
}
}
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3,
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |1 |1 |1 |1 |1
+// 2 |2 |2 |2 |2 |2 |2
+// 3 |3 |3 |3 |3 |3 |3
+// 4 |4 |4 |4 |4 |4 |4
+// 5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num k1|k2|v1|v2|v3
+// 1 1 |1 |10|10|10
+// 2 2 |2 |? |20|20
+// 3 3 |3 |30|30|?
+// 4 4 |4 |40|40|40
+// 5 5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+// old_full_read_columns = [v4, v5], the old values from the previous
rows will be read into these columns.
+// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator
value.
+// Becase the column is immutable, filled_including_value_columns will store
the data merged from
+// the original input block and old_point_read_columns. After the insertion,
the data in the table will be:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |10|10|10|1 |1
+// 2 |2 |2 |20|20|2 |2
+// 3 |3 |30|30|3 |3 |3
+// 4 |4 |40|40|40|4 |4
+// 5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+ vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+ const std::vector<uint32_t>& cids_full_read, const
std::vector<uint32_t>& cids_point_read,
+ const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
+ const size_t& segment_start_pos, bool
is_unique_key_replace_if_not_null) {
+ vectorized::MutableColumns full_columns = full_block->mutate_columns();
+ vectorized::Block old_full_read_block =
_tablet_schema->create_missing_columns_block();
+ vectorized::MutableColumns old_full_read_columns =
old_full_read_block.mutate_columns();
+ vectorized::Block old_point_read_block =
_tablet_schema->create_block_by_cids(cids_point_read);
+ vectorized::MutableColumns old_point_read_columns =
old_point_read_block.mutate_columns();
+ vectorized::MutableColumns filled_including_value_columns =
+ old_point_read_block.clone_empty_columns(); // used to hold data
after being filled
+ // !!NOTE: columns in old_point_read_block may have different row nums!
+
+ // rowid in input block -> line num in old_full_read_block
+ std::map<uint32_t, uint32_t> missing_cols_read_index;
+ // partial update cid -> (rowid in input block -> line num in
old_point_read_block)
+ std::map<uint32_t, std::map<uint32_t, uint32_t>>
parital_update_cols_read_index;
+
+ if (is_unique_key_replace_if_not_null) {
+ RETURN_IF_ERROR(_tablet->read_columns_by_plan(
+ _tablet_schema, _rsid_to_rowset, read_plan, &cids_full_read,
&cids_point_read,
+ &old_full_read_block, &old_point_read_block,
&missing_cols_read_index,
+ &parital_update_cols_read_index));
+ } else {
+ RETURN_IF_ERROR(_tablet->read_columns_by_plan(_tablet_schema,
_rsid_to_rowset, read_plan,
+ &cids_full_read,
&old_full_read_block,
+
&missing_cols_read_index));
+ }
+
// build default value columns
- auto default_value_block = old_value_block.clone_empty();
+ auto default_value_block = old_full_read_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();
if (has_default_or_nullable) {
- for (auto i = 0; i < cids_missing.size(); ++i) {
- const auto& column = _tablet_schema->column(cids_missing[i]);
+ for (auto i = 0; i < cids_full_read.size(); ++i) {
+ const auto& column = _tablet_schema->column(cids_full_read[i]);
if (column.has_default_value()) {
- auto default_value =
_tablet_schema->column(cids_missing[i]).default_value();
+ auto default_value =
_tablet_schema->column(cids_full_read[i]).default_value();
vectorized::ReadBuffer
rb(const_cast<char*>(default_value.c_str()),
default_value.size());
- old_value_block.get_by_position(i).type->from_string(
+ old_full_read_block.get_by_position(i).type->from_string(
rb, mutable_default_value_columns[i].get());
}
}
}
- // fill all missing value from mutable_old_columns, need to consider
default value and null value
+ // fill all missing value from old_value_columns, need to consider default
value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
if (use_default_or_null_flag[idx]) {
- for (auto i = 0; i < cids_missing.size(); ++i) {
+ for (auto i = 0; i < cids_full_read.size(); ++i) {
// if the column has default value, fiil it with default value
// otherwise, if the column is nullable, fill it with null
value
- const auto& tablet_column =
_tablet_schema->column(cids_missing[i]);
+ const auto& tablet_column =
_tablet_schema->column(cids_full_read[i]);
if (tablet_column.has_default_value()) {
- mutable_full_columns[cids_missing[i]]->insert_from(
+ full_columns[cids_full_read[i]]->insert_from(
*mutable_default_value_columns[i].get(), 0);
} else if (tablet_column.is_nullable()) {
auto nullable_column =
assert_cast<vectorized::ColumnNullable*>(
- mutable_full_columns[cids_missing[i]].get());
+ full_columns[cids_full_read[i]].get());
nullable_column->insert_null_elements(1);
} else {
// If the control flow reaches this branch, the column
neither has default value
// nor is nullable. It means that the row's delete sign is
marked, and the value
// columns are useless and won't be read. So we can just
put arbitary values in the cells
- mutable_full_columns[cids_missing[i]]->insert_default();
+ full_columns[cids_full_read[i]]->insert_default();
}
}
continue;
}
- auto pos_in_old_block = read_index[idx + segment_start_pos];
- for (auto i = 0; i < cids_missing.size(); ++i) {
- mutable_full_columns[cids_missing[i]]->insert_from(
-
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
- pos_in_old_block);
+
+ //TODO(bobhan1): fix me later(the wrong row pos)
+ // TODO(bobhan1): handle row store column here!!!
+ uint32_t pos_in_old_block = missing_cols_read_index[idx +
segment_start_pos];
+ for (auto i = 0; i < cids_full_read.size(); ++i) {
+ uint32_t cid = cids_full_read[i];
+ if (full_block->get_by_position(cid).name !=
BeConsts::ROW_STORE_COL) {
+ full_columns[cid]->insert_from(
+
*old_full_read_block.get_columns_with_type_and_name()[i].column.get(),
+ pos_in_old_block);
+ }
+ }
+
+ if (is_unique_key_replace_if_not_null) {
+ //TODO(bobhan1): fix me later(the wrong row pos)
+ for (size_t i = 0; i < cids_point_read.size(); i++) {
+ uint32_t cid = cids_point_read[i];
+ if (parital_update_cols_read_index[cid].contains(idx +
segment_start_pos)) {
+ // cells with indicator value should be replaced with old
values in previous rows
+ uint32_t pos_in_old_block =
Review Comment:
warning: variable 'pos_in_old_block' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
uint32_t pos_in_old_block = 0 =
```
##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const
DataTypeSerDeSPtrs& serdes, const
}
}
+void JsonbSerializeUtil::jsonb_to_block(
+ const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs&
serdes_point_read,
+ const char* data, size_t size,
+ const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read,
+ const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>&
+ col_uid_to_idx_cid_point_read,
+ std::unordered_map<uint32_t, bool>& point_read_cids, Block&
block_full_read,
+ Block& block_point_read) {
+ JsonbDocument& doc = *JsonbDocument::createDocument(data, size);
+ size_t full_read_filled_columns = 0;
+ for (auto it = doc->begin(); it != doc->end(); ++it) {
+ auto col_it = col_uid_to_idx_full_read.find(it->getKeyId());
+ if (col_it != col_uid_to_idx_full_read.end()) {
+ MutableColumnPtr dst_column =
+
block_full_read.get_by_position(col_it->second).column->assume_mutable();
+
serdes_full_read[col_it->second]->read_one_cell_from_jsonb(*dst_column,
it->value());
+ ++full_read_filled_columns;
+ } else {
+ auto it2 = col_uid_to_idx_cid_point_read.find(it->getKeyId());
+ if (it2 != col_uid_to_idx_cid_point_read.end()) {
+ uint32_t cid = it2->second.second;
+ uint32_t idx = it2->second.first;
+ auto it3 = point_read_cids.find(cid);
+ if (it3 != point_read_cids.end()) {
+ MutableColumnPtr dst_column =
Review Comment:
warning: variable 'dst_column' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
MutableColumnPtr dst_column = 0 =
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -332,25 +335,69 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
- // find missing column cids
+ // all the cells in missing_cids will be filled from the old rows
std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+ std::vector<uint32_t> determistic_cids; // columns that dont't need read
from old rows
+ std::vector<uint32_t> point_read_cids;
// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
- size_t input_id = 0;
- for (auto i : including_cids) {
- full_block.replace_by_position(i,
block->get_by_position(input_id++).column);
+
+ // indictors to apply partial updates for including columns,
+ // cells with `indicator` values should be read from the old rows
+ // currently we use treat null value as `indicator`
+ IndicatorMapsVertical indicator_maps_vertical; // cid -> rows
+
+ PartialUpdateReadPlan read_plan;
+ if (_tablet_schema->store_row_column()) {
+ read_plan = RowStoreReadPlan {};
+ }
+
+ bool is_unique_key_replace_if_not_null =
_tablet_schema->is_unique_key_replace_if_not_null();
Review Comment:
warning: variable 'is_unique_key_replace_if_not_null' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
bool is_unique_key_replace_if_not_null = false =
_tablet_schema->is_unique_key_replace_if_not_null();
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
return Status::OK();
}
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
- const std::vector<bool>&
use_default_or_null_flag,
- bool has_default_or_nullable,
- const size_t& segment_start_pos) {
- // create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
- CHECK(cids_missing.size() == old_value_block.columns());
- auto mutable_old_columns = old_value_block.mutate_columns();
- bool has_row_column = _tablet_schema->store_row_column();
- // record real pos, key is input line num, value is old_block line num
- std::map<uint32_t, uint32_t> read_index;
- size_t read_idx = 0;
- for (auto rs_it : _rssid_to_rid) {
- for (auto seg_it : rs_it.second) {
- auto rowset = _rsid_to_rowset[rs_it.first];
- CHECK(rowset);
- std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
- }
- if (has_row_column) {
- auto st = _tablet->fetch_value_through_row_column(rowset,
seg_it.first, rids,
-
cids_missing, old_value_block);
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value through row column";
- return st;
- }
- continue;
- }
- for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
- TabletColumn tablet_column =
_tablet_schema->column(cids_missing[cid]);
- auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids, tablet_column,
-
mutable_old_columns[cid]);
- // set read value to output block
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value by rowids";
- return st;
- }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+ const IndicatorMapsVertical&
indicator_maps_vertical) {
+ _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); //
fixme(baohan): ?
+ for (auto [cid, indicator_map] : indicator_maps_vertical) {
+ for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+ if (indicator_map != nullptr && indicator_map[pos] != 0) {
+ (*_indicator_maps)[pos].emplace_back(cid);
}
}
}
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3,
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |1 |1 |1 |1 |1
+// 2 |2 |2 |2 |2 |2 |2
+// 3 |3 |3 |3 |3 |3 |3
+// 4 |4 |4 |4 |4 |4 |4
+// 5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num k1|k2|v1|v2|v3
+// 1 1 |1 |10|10|10
+// 2 2 |2 |? |20|20
+// 3 3 |3 |30|30|?
+// 4 4 |4 |40|40|40
+// 5 5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+// old_full_read_columns = [v4, v5], the old values from the previous
rows will be read into these columns.
+// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator
value.
+// Becase the column is immutable, filled_including_value_columns will store
the data merged from
+// the original input block and old_point_read_columns. After the insertion,
the data in the table will be:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |10|10|10|1 |1
+// 2 |2 |2 |20|20|2 |2
+// 3 |3 |30|30|3 |3 |3
+// 4 |4 |40|40|40|4 |4
+// 5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+ vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+ const std::vector<uint32_t>& cids_full_read, const
std::vector<uint32_t>& cids_point_read,
+ const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
+ const size_t& segment_start_pos, bool
is_unique_key_replace_if_not_null) {
+ vectorized::MutableColumns full_columns = full_block->mutate_columns();
+ vectorized::Block old_full_read_block =
_tablet_schema->create_missing_columns_block();
+ vectorized::MutableColumns old_full_read_columns =
old_full_read_block.mutate_columns();
+ vectorized::Block old_point_read_block =
_tablet_schema->create_block_by_cids(cids_point_read);
+ vectorized::MutableColumns old_point_read_columns =
old_point_read_block.mutate_columns();
+ vectorized::MutableColumns filled_including_value_columns =
Review Comment:
warning: variable 'filled_including_value_columns' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
vectorized::MutableColumns filled_including_value_columns = 0 =
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -332,25 +335,69 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
- // find missing column cids
+ // all the cells in missing_cids will be filled from the old rows
std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+ std::vector<uint32_t> determistic_cids; // columns that dont't need read
from old rows
+ std::vector<uint32_t> point_read_cids;
// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
- size_t input_id = 0;
- for (auto i : including_cids) {
- full_block.replace_by_position(i,
block->get_by_position(input_id++).column);
+
+ // indictors to apply partial updates for including columns,
+ // cells with `indicator` values should be read from the old rows
+ // currently we use treat null value as `indicator`
+ IndicatorMapsVertical indicator_maps_vertical; // cid -> rows
+
+ PartialUpdateReadPlan read_plan;
Review Comment:
warning: variable 'read_plan' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
PartialUpdateReadPlan read_plan = 0;
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -332,25 +335,69 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
- // find missing column cids
+ // all the cells in missing_cids will be filled from the old rows
std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+ std::vector<uint32_t> determistic_cids; // columns that dont't need read
from old rows
+ std::vector<uint32_t> point_read_cids;
// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
- size_t input_id = 0;
- for (auto i : including_cids) {
- full_block.replace_by_position(i,
block->get_by_position(input_id++).column);
+
+ // indictors to apply partial updates for including columns,
+ // cells with `indicator` values should be read from the old rows
+ // currently we use treat null value as `indicator`
+ IndicatorMapsVertical indicator_maps_vertical; // cid -> rows
Review Comment:
warning: variable 'indicator_maps_vertical' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
IndicatorMapsVertical indicator_maps_vertical = 0; // cid -> rows
```
##########
be/src/olap/rowset/beta_rowset_writer.h:
##########
@@ -102,6 +102,11 @@ class BetaRowsetWriter : public RowsetWriter {
int64_t num_rows_filtered() const override { return
_segment_creator.num_rows_filtered(); }
+ // currently, a rowset cantains at most one segment, so we just return the
segment's indicator maps
+ std::shared_ptr<IndicatorMaps> get_indicator_maps() const override {
Review Comment:
warning: function 'get_indicator_maps' should be marked [[nodiscard]]
[modernize-use-nodiscard]
```suggestion
[[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const
override {
```
##########
be/src/olap/rowset/rowset_writer.h:
##########
@@ -131,6 +131,8 @@ class RowsetWriter {
virtual int64_t num_rows_filtered() const = 0;
+ virtual std::shared_ptr<IndicatorMaps> get_indicator_maps() const = 0;
Review Comment:
warning: function 'get_indicator_maps' should be marked [[nodiscard]]
[modernize-use-nodiscard]
```suggestion
[[nodiscard]] virtual std::shared_ptr<IndicatorMaps>
get_indicator_maps() const = 0;
```
##########
be/src/olap/rowset/segment_creator.h:
##########
@@ -166,6 +170,10 @@ class SegmentCreator {
int64_t num_rows_filtered() const { return
_segment_flusher.num_rows_filtered(); }
+ std::shared_ptr<IndicatorMaps> get_indicator_maps() const {
Review Comment:
warning: function 'get_indicator_maps' should be marked [[nodiscard]]
[modernize-use-nodiscard]
```suggestion
[[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const {
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
return Status::OK();
}
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
- const std::vector<bool>&
use_default_or_null_flag,
- bool has_default_or_nullable,
- const size_t& segment_start_pos) {
- // create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
- CHECK(cids_missing.size() == old_value_block.columns());
- auto mutable_old_columns = old_value_block.mutate_columns();
- bool has_row_column = _tablet_schema->store_row_column();
- // record real pos, key is input line num, value is old_block line num
- std::map<uint32_t, uint32_t> read_index;
- size_t read_idx = 0;
- for (auto rs_it : _rssid_to_rid) {
- for (auto seg_it : rs_it.second) {
- auto rowset = _rsid_to_rowset[rs_it.first];
- CHECK(rowset);
- std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
- }
- if (has_row_column) {
- auto st = _tablet->fetch_value_through_row_column(rowset,
seg_it.first, rids,
-
cids_missing, old_value_block);
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value through row column";
- return st;
- }
- continue;
- }
- for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
- TabletColumn tablet_column =
_tablet_schema->column(cids_missing[cid]);
- auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids, tablet_column,
-
mutable_old_columns[cid]);
- // set read value to output block
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value by rowids";
- return st;
- }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+ const IndicatorMapsVertical&
indicator_maps_vertical) {
+ _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); //
fixme(baohan): ?
+ for (auto [cid, indicator_map] : indicator_maps_vertical) {
+ for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+ if (indicator_map != nullptr && indicator_map[pos] != 0) {
+ (*_indicator_maps)[pos].emplace_back(cid);
}
}
}
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3,
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |1 |1 |1 |1 |1
+// 2 |2 |2 |2 |2 |2 |2
+// 3 |3 |3 |3 |3 |3 |3
+// 4 |4 |4 |4 |4 |4 |4
+// 5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num k1|k2|v1|v2|v3
+// 1 1 |1 |10|10|10
+// 2 2 |2 |? |20|20
+// 3 3 |3 |30|30|?
+// 4 4 |4 |40|40|40
+// 5 5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+// old_full_read_columns = [v4, v5], the old values from the previous
rows will be read into these columns.
+// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator
value.
+// Becase the column is immutable, filled_including_value_columns will store
the data merged from
+// the original input block and old_point_read_columns. After the insertion,
the data in the table will be:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |10|10|10|1 |1
+// 2 |2 |2 |20|20|2 |2
+// 3 |3 |30|30|3 |3 |3
+// 4 |4 |40|40|40|4 |4
+// 5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+ vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+ const std::vector<uint32_t>& cids_full_read, const
std::vector<uint32_t>& cids_point_read,
+ const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
+ const size_t& segment_start_pos, bool
is_unique_key_replace_if_not_null) {
+ vectorized::MutableColumns full_columns = full_block->mutate_columns();
Review Comment:
warning: variable 'full_columns' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
vectorized::MutableColumns full_columns = 0 =
full_block->mutate_columns();
```
##########
be/src/olap/tablet.cpp:
##########
@@ -2687,20 +2688,40 @@ Status
Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
&column_iterator, &stats));
// get and parse tuple row
vectorized::MutableColumnPtr column_ptr =
vectorized::ColumnString::create();
- RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(),
rowids.size(), column_ptr));
- assert(column_ptr->size() == rowids.size());
+ RETURN_IF_ERROR(column_iterator->read_by_rowids(rids.data(), rids.size(),
column_ptr));
+ assert(column_ptr->size() == rids.size());
auto string_column =
static_cast<vectorized::ColumnString*>(column_ptr.get());
- vectorized::DataTypeSerDeSPtrs serdes;
- serdes.resize(cids.size());
- std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
- for (int i = 0; i < cids.size(); ++i) {
- const TabletColumn& column = tablet_schema->column(cids[i]);
+ vectorized::DataTypeSerDeSPtrs serdes_full_read;
+ serdes_full_read.resize(cids_full_read->size());
Review Comment:
warning: variable 'serdes_full_read' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
ypeSerDeSPtrs serdes_full_read = 0;
```
##########
be/src/olap/tablet_schema.h:
##########
@@ -363,8 +364,12 @@ class TabletSchema {
}
void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode =
is_strict_mode; }
bool is_strict_mode() const { return _is_strict_mode; }
- std::vector<uint32_t> get_missing_cids() const { return _missing_cids; }
- std::vector<uint32_t> get_update_cids() const { return _update_cids; }
+ void set_is_unique_key_replace_if_not_null(bool
is_unique_key_replace_if_not_null) {
+ _is_unique_key_replace_if_not_null = is_unique_key_replace_if_not_null;
+ }
+ bool is_unique_key_replace_if_not_null() const { return
_is_unique_key_replace_if_not_null; }
Review Comment:
warning: function 'is_unique_key_replace_if_not_null' should be marked
[[nodiscard]] [modernize-use-nodiscard]
```suggestion
[[nodiscard]] bool is_unique_key_replace_if_not_null() const { return
_is_unique_key_replace_if_not_null; }
```
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
return Status::OK();
}
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
- const std::vector<bool>&
use_default_or_null_flag,
- bool has_default_or_nullable,
- const size_t& segment_start_pos) {
- // create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
- CHECK(cids_missing.size() == old_value_block.columns());
- auto mutable_old_columns = old_value_block.mutate_columns();
- bool has_row_column = _tablet_schema->store_row_column();
- // record real pos, key is input line num, value is old_block line num
- std::map<uint32_t, uint32_t> read_index;
- size_t read_idx = 0;
- for (auto rs_it : _rssid_to_rid) {
- for (auto seg_it : rs_it.second) {
- auto rowset = _rsid_to_rowset[rs_it.first];
- CHECK(rowset);
- std::vector<uint32_t> rids;
- for (auto id_and_pos : seg_it.second) {
- rids.emplace_back(id_and_pos.rid);
- read_index[id_and_pos.pos] = read_idx++;
- }
- if (has_row_column) {
- auto st = _tablet->fetch_value_through_row_column(rowset,
seg_it.first, rids,
-
cids_missing, old_value_block);
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value through row column";
- return st;
- }
- continue;
- }
- for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
- TabletColumn tablet_column =
_tablet_schema->column(cids_missing[cid]);
- auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids, tablet_column,
-
mutable_old_columns[cid]);
- // set read value to output block
- if (!st.ok()) {
- LOG(WARNING) << "failed to fetch value by rowids";
- return st;
- }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+ const IndicatorMapsVertical&
indicator_maps_vertical) {
+ _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); //
fixme(baohan): ?
+ for (auto [cid, indicator_map] : indicator_maps_vertical) {
+ for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+ if (indicator_map != nullptr && indicator_map[pos] != 0) {
+ (*_indicator_maps)[pos].emplace_back(cid);
}
}
}
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3,
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |1 |1 |1 |1 |1
+// 2 |2 |2 |2 |2 |2 |2
+// 3 |3 |3 |3 |3 |3 |3
+// 4 |4 |4 |4 |4 |4 |4
+// 5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num k1|k2|v1|v2|v3
+// 1 1 |1 |10|10|10
+// 2 2 |2 |? |20|20
+// 3 3 |3 |30|30|?
+// 4 4 |4 |40|40|40
+// 5 5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+// old_full_read_columns = [v4, v5], the old values from the previous
rows will be read into these columns.
+// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator
value.
+// Becase the column is immutable, filled_including_value_columns will store
the data merged from
+// the original input block and old_point_read_columns. After the insertion,
the data in the table will be:
+// k1|k2|v1|v2|v3|v4|v5
+// 1 |1 |10|10|10|1 |1
+// 2 |2 |2 |20|20|2 |2
+// 3 |3 |30|30|3 |3 |3
+// 4 |4 |40|40|40|4 |4
+// 5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+ vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+ const std::vector<uint32_t>& cids_full_read, const
std::vector<uint32_t>& cids_point_read,
+ const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
+ const size_t& segment_start_pos, bool
is_unique_key_replace_if_not_null) {
+ vectorized::MutableColumns full_columns = full_block->mutate_columns();
+ vectorized::Block old_full_read_block =
_tablet_schema->create_missing_columns_block();
+ vectorized::MutableColumns old_full_read_columns =
old_full_read_block.mutate_columns();
+ vectorized::Block old_point_read_block =
_tablet_schema->create_block_by_cids(cids_point_read);
+ vectorized::MutableColumns old_point_read_columns =
old_point_read_block.mutate_columns();
+ vectorized::MutableColumns filled_including_value_columns =
+ old_point_read_block.clone_empty_columns(); // used to hold data
after being filled
+ // !!NOTE: columns in old_point_read_block may have different row nums!
+
+ // rowid in input block -> line num in old_full_read_block
+ std::map<uint32_t, uint32_t> missing_cols_read_index;
+ // partial update cid -> (rowid in input block -> line num in
old_point_read_block)
+ std::map<uint32_t, std::map<uint32_t, uint32_t>>
parital_update_cols_read_index;
+
+ if (is_unique_key_replace_if_not_null) {
+ RETURN_IF_ERROR(_tablet->read_columns_by_plan(
+ _tablet_schema, _rsid_to_rowset, read_plan, &cids_full_read,
&cids_point_read,
+ &old_full_read_block, &old_point_read_block,
&missing_cols_read_index,
+ &parital_update_cols_read_index));
+ } else {
+ RETURN_IF_ERROR(_tablet->read_columns_by_plan(_tablet_schema,
_rsid_to_rowset, read_plan,
+ &cids_full_read,
&old_full_read_block,
+
&missing_cols_read_index));
+ }
+
// build default value columns
- auto default_value_block = old_value_block.clone_empty();
+ auto default_value_block = old_full_read_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();
if (has_default_or_nullable) {
- for (auto i = 0; i < cids_missing.size(); ++i) {
- const auto& column = _tablet_schema->column(cids_missing[i]);
+ for (auto i = 0; i < cids_full_read.size(); ++i) {
+ const auto& column = _tablet_schema->column(cids_full_read[i]);
if (column.has_default_value()) {
- auto default_value =
_tablet_schema->column(cids_missing[i]).default_value();
+ auto default_value =
_tablet_schema->column(cids_full_read[i]).default_value();
vectorized::ReadBuffer
rb(const_cast<char*>(default_value.c_str()),
default_value.size());
- old_value_block.get_by_position(i).type->from_string(
+ old_full_read_block.get_by_position(i).type->from_string(
rb, mutable_default_value_columns[i].get());
}
}
}
- // fill all missing value from mutable_old_columns, need to consider
default value and null value
+ // fill all missing value from old_value_columns, need to consider default
value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
if (use_default_or_null_flag[idx]) {
- for (auto i = 0; i < cids_missing.size(); ++i) {
+ for (auto i = 0; i < cids_full_read.size(); ++i) {
// if the column has default value, fiil it with default value
// otherwise, if the column is nullable, fill it with null
value
- const auto& tablet_column =
_tablet_schema->column(cids_missing[i]);
+ const auto& tablet_column =
_tablet_schema->column(cids_full_read[i]);
if (tablet_column.has_default_value()) {
- mutable_full_columns[cids_missing[i]]->insert_from(
+ full_columns[cids_full_read[i]]->insert_from(
*mutable_default_value_columns[i].get(), 0);
} else if (tablet_column.is_nullable()) {
auto nullable_column =
assert_cast<vectorized::ColumnNullable*>(
- mutable_full_columns[cids_missing[i]].get());
+ full_columns[cids_full_read[i]].get());
nullable_column->insert_null_elements(1);
} else {
// If the control flow reaches this branch, the column
neither has default value
// nor is nullable. It means that the row's delete sign is
marked, and the value
// columns are useless and won't be read. So we can just
put arbitary values in the cells
- mutable_full_columns[cids_missing[i]]->insert_default();
+ full_columns[cids_full_read[i]]->insert_default();
}
}
continue;
}
- auto pos_in_old_block = read_index[idx + segment_start_pos];
- for (auto i = 0; i < cids_missing.size(); ++i) {
- mutable_full_columns[cids_missing[i]]->insert_from(
-
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
- pos_in_old_block);
+
+ //TODO(bobhan1): fix me later(the wrong row pos)
+ // TODO(bobhan1): handle row store column here!!!
+ uint32_t pos_in_old_block = missing_cols_read_index[idx +
segment_start_pos];
Review Comment:
warning: variable 'pos_in_old_block' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
uint32_t pos_in_old_block = 0 = missing_cols_read_index[idx +
segment_start_pos];
```
##########
be/src/olap/tablet.cpp:
##########
@@ -2687,20 +2688,40 @@ Status
Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
&column_iterator, &stats));
// get and parse tuple row
vectorized::MutableColumnPtr column_ptr =
vectorized::ColumnString::create();
- RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(),
rowids.size(), column_ptr));
- assert(column_ptr->size() == rowids.size());
+ RETURN_IF_ERROR(column_iterator->read_by_rowids(rids.data(), rids.size(),
column_ptr));
+ assert(column_ptr->size() == rids.size());
auto string_column =
static_cast<vectorized::ColumnString*>(column_ptr.get());
- vectorized::DataTypeSerDeSPtrs serdes;
- serdes.resize(cids.size());
- std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
- for (int i = 0; i < cids.size(); ++i) {
- const TabletColumn& column = tablet_schema->column(cids[i]);
+ vectorized::DataTypeSerDeSPtrs serdes_full_read;
+ serdes_full_read.resize(cids_full_read->size());
+
+ std::unordered_map<uint32_t, uint32_t> col_uid_to_idx_full_read;
+
+ for (int i = 0; i < cids_full_read->size(); ++i) {
+ const TabletColumn& column =
tablet_schema->column(cids_full_read->at(i));
vectorized::DataTypePtr type =
vectorized::DataTypeFactory::instance().create_data_type(column);
- col_uid_to_idx[column.unique_id()] = i;
- serdes[i] = type->get_serde();
+ col_uid_to_idx_full_read[column.unique_id()] = i;
+ serdes_full_read[i] = type->get_serde();
+ }
+
+ if (with_point_read) {
+ vectorized::DataTypeSerDeSPtrs serdes_point_read;
+ serdes_point_read.resize(cids_point_read->size());
Review Comment:
warning: variable 'serdes_point_read' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
peSerDeSPtrs serdes_point_read = 0;
```
##########
be/src/olap/tablet.cpp:
##########
@@ -3084,110 +3118,278 @@ Status Tablet::generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan&
read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
+ std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>>
indicator_maps,
vectorized::Block* output_block) {
// do partial update related works
// 1. read columns by read plan
// 2. generate new block
// 3. write a new segment and modify rowset meta
// 4. mark current keys deleted
CHECK(output_block);
- auto full_mutable_columns = output_block->mutate_columns();
- auto old_block = rowset_schema->create_missing_columns_block();
- auto missing_cids = rowset_schema->get_missing_cids();
- auto update_block = rowset_schema->create_update_columns_block();
- auto update_cids = rowset_schema->get_update_cids();
-
- std::map<uint32_t, uint32_t> read_index_old;
- RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids,
read_plan_ori, rsid_to_rowset,
- old_block, &read_index_old));
-
- std::map<uint32_t, uint32_t> read_index_update;
- RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids,
read_plan_update,
- rsid_to_rowset, update_block,
&read_index_update));
+ if (!indicator_maps) {
+ auto full_mutable_columns = output_block->mutate_columns();
+ auto old_block = rowset_schema->create_missing_columns_block();
+ auto missing_cids = rowset_schema->get_missing_cids();
+ auto update_block = rowset_schema->create_update_columns_block();
+ auto update_cids = rowset_schema->get_update_cids();
+
+ std::map<uint32_t, uint32_t> read_index_old;
+ RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset,
read_plan_ori,
+ &missing_cids, &old_block,
&read_index_old));
+
+ std::map<uint32_t, uint32_t> read_index_update;
+ RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset,
read_plan_update,
+ &update_cids, &update_block,
&read_index_update));
+
+ // build full block
+ CHECK(read_index_old.size() == read_index_update.size());
+ for (auto i = 0; i < missing_cids.size(); ++i) {
+ for (auto idx = 0; idx < read_index_old.size(); ++idx) {
+ full_mutable_columns[missing_cids[i]]->insert_from(
+
*old_block.get_columns_with_type_and_name()[i].column.get(),
+ read_index_old[idx]);
+ }
+ }
+ for (auto i = 0; i < update_cids.size(); ++i) {
+ for (auto idx = 0; idx < read_index_update.size(); ++idx) {
+ full_mutable_columns[update_cids[i]]->insert_from(
+
*update_block.get_columns_with_type_and_name()[i].column.get(),
+ read_index_update[idx]);
+ }
+ }
+ VLOG_DEBUG << "full block when publish: " << output_block->dump_data();
+ } else {
+ std::unordered_set<uint32_t> cids_point_read;
+ for (const auto& [_, cids] : *indicator_maps) {
Review Comment:
warning: variable 'cids_point_read' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
_set<uint32_t> cids_point_read = 0;
```
##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const
DataTypeSerDeSPtrs& serdes, const
}
}
+void JsonbSerializeUtil::jsonb_to_block(
+ const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs&
serdes_point_read,
+ const char* data, size_t size,
+ const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read,
+ const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>&
+ col_uid_to_idx_cid_point_read,
+ std::unordered_map<uint32_t, bool>& point_read_cids, Block&
block_full_read,
+ Block& block_point_read) {
+ JsonbDocument& doc = *JsonbDocument::createDocument(data, size);
+ size_t full_read_filled_columns = 0;
+ for (auto it = doc->begin(); it != doc->end(); ++it) {
+ auto col_it = col_uid_to_idx_full_read.find(it->getKeyId());
+ if (col_it != col_uid_to_idx_full_read.end()) {
+ MutableColumnPtr dst_column =
Review Comment:
warning: variable 'dst_column' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
MutableColumnPtr dst_column = 0 =
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]