This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 98f2824796ce25176a2379e52596a52238e1a6de Author: Xin Liao <[email protected]> AuthorDate: Wed Dec 21 09:50:13 2022 +0800 [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101) It is very difficult to investigate the data inconsistency of multiple replicas. When loading data, the number of rows between replicas is checked to avoid some data inconsistency problems. --- be/src/exec/tablet_sink.cpp | 41 +++++++++++++++++++++++++++++++ be/src/exec/tablet_sink.h | 11 +++++++++ be/src/olap/delta_writer.cpp | 10 ++++++++ be/src/olap/delta_writer.h | 7 ++++++ be/src/olap/memtable.cpp | 2 ++ be/src/olap/memtable.h | 2 ++ be/src/olap/rowset/beta_rowset_writer.cpp | 1 + be/src/vec/sink/vtablet_sink.cpp | 4 +++ gensrc/proto/internal_service.proto | 2 ++ 9 files changed, 80 insertions(+) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 7c0051c191..cbeeace20a 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -247,6 +247,10 @@ Status NodeChannel::open_wait() { commit_info.tabletId = tablet.tablet_id(); commit_info.backendId = _node_id; _tablet_commit_infos.emplace_back(std::move(commit_info)); + if (tablet.has_received_rows()) { + _tablets_received_rows.emplace_back(tablet.tablet_id(), + tablet.received_rows()); + } VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() << ", backendId=" << _node_id @@ -457,6 +461,7 @@ Status NodeChannel::close_wait(RuntimeState* state) { std::make_move_iterator(_tablet_commit_infos.end())); _index_channel->set_error_tablet_in_state(state); + _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); return Status::OK(); } @@ -769,6 +774,39 @@ void IndexChannel::set_error_tablet_in_state(RuntimeState* state) { } } +void IndexChannel::set_tablets_received_rows( + const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id) { + for (const auto& [tablet_id, rows_num] : tablets_received_rows) { + _tablets_received_rows[tablet_id].emplace_back(node_id, rows_num); + } +} + +Status IndexChannel::check_tablet_received_rows_consistency() { + for (auto& tablet : _tablets_received_rows) { + for (size_t i = 0; i < tablet.second.size(); i++) { + VLOG_NOTICE << "check_tablet_received_rows_consistency, load_id: " << _parent->_load_id + << ", txn_id: " << std::to_string(_parent->_txn_id) + << ", tablet_id: " << tablet.first + << ", node_id: " << tablet.second[i].first + << ", rows_num: " << tablet.second[i].second; + if (i == 0) { + continue; + } + if (tablet.second[i].second != tablet.second[0].second) { + LOG(WARNING) << "rows num doest't match, load_id: " << _parent->_load_id + << ", txn_id: " << std::to_string(_parent->_txn_id) + << ", tablt_id: " << tablet.first + << ", node_id: " << tablet.second[i].first + << ", rows_num: " << tablet.second[i].second + << ", node_id: " << tablet.second[0].first + << ", rows_num: " << tablet.second[0].second; + return Status::InternalError("rows num written by multi replicas doest't match"); + } + } + } + return Status::OK(); +} + OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs, Status* status) : _pool(pool), @@ -1155,6 +1193,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { Status index_st = index_channel->check_intolerable_failure(); if (!index_st.ok()) { status = index_st; + } else if (Status st = index_channel->check_tablet_received_rows_consistency(); + !st.ok()) { + status = st; } } // end for index channels } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index fb820d6c42..e6973f7b7a 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -321,6 +321,8 @@ protected: bool _is_closed = false; RuntimeState* _state; + // rows number received per tablet, tablet_id -> rows_num + std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows; private: std::unique_ptr<RowBatch> _cur_batch; @@ -368,6 +370,12 @@ public: return mem_consumption; } + void set_tablets_received_rows( + const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id); + + // check whether the rows num written by different replicas is consistent + Status check_tablet_received_rows_consistency(); + private: friend class NodeChannel; friend class VNodeChannel; @@ -398,6 +406,9 @@ private: Status _intolerable_failure_status = Status::OK(); std::unique_ptr<MemTracker> _index_channel_tracker; + // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num> + // used to verify whether the rows num received by different replicas is consistent + std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows; }; template <typename Row> diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 75f402f202..09afa54be9 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -163,6 +163,7 @@ Status DeltaWriter::write(Tuple* tuple) { return _cancel_status; } + _total_received_rows++; _mem_table->insert(tuple); // if memtable is full, push it to the flush executor, @@ -188,6 +189,7 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row return _cancel_status; } + _total_received_rows += row_idxs.size(); for (const auto& row_idx : row_idxs) { _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0)); } @@ -216,6 +218,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> return _cancel_status; } + _total_received_rows += row_idxs.size(); _mem_table->insert(block, row_idxs); if (_mem_table->need_to_agg()) { @@ -233,6 +236,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> } Status DeltaWriter::_flush_memtable_async() { + _merged_rows += _mem_table->merged_rows(); return _flush_token->submit(std::move(_mem_table)); } @@ -354,6 +358,12 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, _mem_table.reset(); + if (_rowset_writer->num_rows() + _merged_rows != _total_received_rows) { + LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: " + << _rowset_writer->num_rows() << ", merged_rows: " << _merged_rows + << ", total received rows: " << _total_received_rows; + return Status::InternalError("rows number written by delta writer dosen't match"); + } // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); if (_cur_rowset == nullptr) { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 5484c7c7db..dafd77a8f8 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -110,6 +110,8 @@ public: void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); + int64_t total_received_rows() const { return _total_received_rows; } + private: DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id, bool is_vec); @@ -172,6 +174,11 @@ private: RowsetIdUnorderedSet _rowset_ids; // current max version, used to calculate delete bitmap int64_t _cur_max_version; + + // total rows num written by DeltaWriter + int64_t _total_received_rows = 0; + // rows num merged by memtable + int64_t _merged_rows = 0; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index ab1a0f5209..46cc15275e 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -219,6 +219,7 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) { bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint); if (is_exist) { + _merged_rows++; _aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key); } else { row_in_block->init_agg_places( @@ -248,6 +249,7 @@ void MemTable::_insert_agg(const Tuple* tuple) { bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint); if (is_exist) { + _merged_rows++; (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key); } else { tuple_buf = _table_mem_pool->allocate(_schema_size); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index e7f59ff151..27606f3552 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -70,6 +70,7 @@ public: Status close(); int64_t flush_size() const { return _flush_size; } + int64_t merged_rows() const { return _merged_rows; } private: Status _do_flush(int64_t& duration_ns); @@ -202,6 +203,7 @@ private: // This is not the rows in this memtable, because rows may be merged // in unique or aggregate key model. int64_t _rows = 0; + int64_t _merged_rows = 0; void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr; void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row, TableKey row_in_skiplist) = nullptr; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 687e8bac35..d2277706a7 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -660,6 +660,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus } ContiguousRow dst_row = it.get_current_row(); auto s = writer->append_row(dst_row); + _raw_num_rows_written++; if (PREDICT_FALSE(!s.ok())) { LOG(WARNING) << "failed to append row: " << s.to_string(); return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index a9461e1659..7d01bf3105 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -135,6 +135,10 @@ Status VNodeChannel::open_wait() { commit_info.tabletId = tablet.tablet_id(); commit_info.backendId = _node_id; _tablet_commit_infos.emplace_back(std::move(commit_info)); + if (tablet.has_received_rows()) { + _tablets_received_rows.emplace_back(tablet.tablet_id(), + tablet.received_rows()); + } VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() << ", backendId=" << _node_id << ", master node id: " << this->node_id() diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index bf61b22707..0396e8944b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -65,6 +65,8 @@ message PTabletInfo { // Delta Writer will write data to local disk and then check if there are new raw values not in global dict // if appears, then it should add the column name to this vector repeated string invalid_dict_cols = 3; + // total rows num received by DeltaWriter + optional int64 received_rows = 4; } // open a tablet writer --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
