This is an automated email from the ASF dual-hosted git repository. lichaoyong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new ca96ea3 [Memory Engine] MemTablet creation and compatibility handling in BE (#3762) ca96ea3 is described below commit ca96ea30560c9e9837c28cfd2cdd8ed24196f787 Author: Binglin Chang <decst...@gmail.com> AuthorDate: Thu Jun 18 09:56:07 2020 +0800 [Memory Engine] MemTablet creation and compatibility handling in BE (#3762) --- be/src/agent/task_worker_pool.cpp | 13 +- be/src/http/action/meta_action.cpp | 5 +- be/src/olap/base_tablet.cpp | 30 +++- be/src/olap/base_tablet.h | 138 +++++++++++++++ be/src/olap/data_dir.cpp | 4 +- be/src/olap/data_dir.h | 5 +- be/src/olap/memory/mem_tablet.cpp | 34 +++- be/src/olap/memory/mem_tablet.h | 19 ++- be/src/olap/storage_engine.cpp | 12 +- be/src/olap/tablet.cpp | 25 +-- be/src/olap/tablet.h | 146 ++-------------- be/src/olap/tablet_manager.cpp | 286 +++++++++++++++++++++++--------- be/src/olap/tablet_manager.h | 30 ++-- be/src/olap/tablet_meta.cpp | 2 +- be/test/olap/memory/mem_tablet_test.cpp | 2 +- gensrc/thrift/MasterService.thrift | 1 + 16 files changed, 476 insertions(+), 276 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 42f178a..d54b090 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -344,8 +344,9 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { } else { ++_s_report_version; // get path hash of the created tablet - TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash); + BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()-> + get_base_tablet(create_tablet_req.tablet_id, + create_tablet_req.tablet_schema.schema_hash); DCHECK(tablet != nullptr); TTabletInfo tablet_info; tablet_info.tablet_id = tablet->table_id(); @@ -399,8 +400,8 @@ void* TaskWorkerPool::_drop_tablet_worker_thread_callback(void* arg_this) { TStatusCode::type status_code = TStatusCode::OK; vector<string> error_msgs; TStatus task_status; - TabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - drop_tablet_req.tablet_id, drop_tablet_req.schema_hash); + BaseTabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()-> + get_base_tablet(drop_tablet_req.tablet_id, drop_tablet_req.schema_hash); if (dropped_tablet != nullptr) { OLAPStatus drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet( drop_tablet_req.tablet_id, drop_tablet_req.schema_hash); @@ -827,8 +828,8 @@ void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this) TStatus task_status; for (auto tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { - TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - tablet_meta_info.tablet_id, tablet_meta_info.schema_hash); + BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()-> + get_base_tablet(tablet_meta_info.tablet_id, tablet_meta_info.schema_hash); if (tablet == nullptr) { LOG(WARNING) << "could not find tablet when update partition id" << " tablet_id=" << tablet_meta_info.tablet_id diff --git a/be/src/http/action/meta_action.cpp b/be/src/http/action/meta_action.cpp index 2fad2a9..90c90a0 100644 --- a/be/src/http/action/meta_action.cpp +++ b/be/src/http/action/meta_action.cpp @@ -53,9 +53,8 @@ Status MetaAction::_handle_header(HttpRequest* req, std::string* json_meta) { << ", schema_hash:" << req_schema_hash; return Status::InternalError(strings::Substitute("convert failed, $0", e.what())); } - - TabletSharedPtr tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); + BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_base_tablet( + tablet_id, schema_hash); if (tablet == nullptr) { LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema hash:" << schema_hash; return Status::InternalError("no tablet exist"); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index d544f36..7346715 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -26,12 +26,31 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : _state(tablet_meta->tablet_state()), _tablet_meta(tablet_meta), _schema(tablet_meta->tablet_schema()), - _data_dir(data_dir) { + _data_dir(data_dir), + _is_bad(false) { _gen_tablet_path(); } BaseTablet::~BaseTablet() {} + +OLAPStatus BaseTablet::init() { + return _init_once.call([this] { return _init_once_action(); }); +} + +// should save tablet meta to remote meta store +// if it's a primary replica +void BaseTablet::save_meta() { + auto res = _tablet_meta->save_meta(_data_dir); + CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res + << ", root=" << _data_dir->path(); + // User could directly update tablet schema by _tablet_meta, + // So we need to refetch schema again + _schema = _tablet_meta->tablet_schema(); + // TODO: update _mem_schema too? +} + + OLAPStatus BaseTablet::set_tablet_state(TabletState state) { if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) { LOG(WARNING) << "could not change tablet state from shutdown to " << state; @@ -52,4 +71,13 @@ void BaseTablet::_gen_tablet_path() { } } +OLAPStatus BaseTablet::set_partition_id(int64_t partition_id) { + return _tablet_meta->set_partition_id(partition_id); +} + +TabletInfo BaseTablet::get_tablet_info() const { + return TabletInfo(tablet_id(), schema_hash(), tablet_uid()); +} + + } /* namespace doris */ diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index f3b0c2d..f3ebfb0 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -20,13 +20,19 @@ #include <memory> +#include "gen_cpp/AgentService_types.h" +#include "gen_cpp/MasterService_types.h" +#include "olap/data_dir.h" #include "olap/olap_define.h" #include "olap/tablet_meta.h" #include "olap/utils.h" +#include "util/once.h" namespace doris { class DataDir; +class BaseTablet; +using BaseTabletSharedPtr = std::shared_ptr<BaseTablet>; // Base class for all tablet classes, currently only olap/Tablet and // olap/memory/MemTablet. @@ -60,11 +66,54 @@ public: inline void set_creation_time(int64_t creation_time); inline bool equal(int64_t tablet_id, int32_t schema_hash); + OLAPStatus init(); + inline bool init_succeeded(); + + bool is_used(); + + void save_meta(); + + void register_tablet_into_dir(); + void deregister_tablet_from_dir(); + + // properties encapsulated in TabletSchema inline const TabletSchema& tablet_schema() const; + inline size_t tablet_footprint(); // disk space occupied by tablet + inline size_t num_rows(); + inline int version_count() const; + inline Version max_version() const; + + // propreties encapsulated in TabletSchema + inline KeysType keys_type() const; + inline size_t num_columns() const; + inline size_t num_null_columns() const; + inline size_t num_key_columns() const; + inline size_t num_short_key_columns() const; + inline size_t num_rows_per_row_block() const; + inline CompressKind compress_kind() const; + inline double bloom_filter_fpp() const; + inline size_t next_unique_id() const; + inline size_t row_size() const; + inline size_t field_index(const string& field_name) const; + + OLAPStatus set_partition_id(int64_t partition_id); + + TabletInfo get_tablet_info() const; + + // meta lock + inline void obtain_header_rdlock() { _meta_lock.rdlock(); } + inline void obtain_header_wrlock() { _meta_lock.wrlock(); } + inline void release_header_lock() { _meta_lock.unlock(); } + inline RWMutex* get_header_lock_ptr() { return &_meta_lock; } + + virtual void build_tablet_report_info(TTabletInfo* tablet_info) = 0; + + virtual void delete_all_files() = 0; protected: void _gen_tablet_path(); + virtual OLAPStatus _init_once_action() = 0; protected: TabletState _state; @@ -74,6 +123,13 @@ protected: DataDir* _data_dir; std::string _tablet_path; + DorisCallOnce<OLAPStatus> _init_once; + // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to + // explain how these two locks work together. + mutable RWMutex _meta_lock; + // if this tablet is broken, set to true. default is false + std::atomic<bool> _is_bad; + private: DISALLOW_COPY_AND_ASSIGN(BaseTablet); }; @@ -141,6 +197,88 @@ inline const TabletSchema& BaseTablet::tablet_schema() const { return _schema; } +inline bool BaseTablet::init_succeeded() { + return _init_once.has_called() && _init_once.stored_result() == OLAP_SUCCESS; +} + +inline bool BaseTablet::is_used() { + return !_is_bad && _data_dir->is_used(); +} + +inline void BaseTablet::register_tablet_into_dir() { + _data_dir->register_tablet(this); +} + +inline void BaseTablet::deregister_tablet_from_dir() { + _data_dir->deregister_tablet(this); +} + +// TODO(lingbin): Why other methods that need to get information from _tablet_meta +// are not locked, here needs a comment to explain. +inline size_t BaseTablet::tablet_footprint() { + ReadLock rdlock(&_meta_lock); + return _tablet_meta->tablet_footprint(); +} + +// TODO(lingbin): Why other methods which need to get information from _tablet_meta +// are not locked, here needs a comment to explain. +inline size_t BaseTablet::num_rows() { + ReadLock rdlock(&_meta_lock); + return _tablet_meta->num_rows(); +} + +inline int BaseTablet::version_count() const { + return _tablet_meta->version_count(); +} + +inline Version BaseTablet::max_version() const { + return _tablet_meta->max_version(); +} + +inline KeysType BaseTablet::keys_type() const { + return _schema.keys_type(); +} + +inline size_t BaseTablet::num_columns() const { + return _schema.num_columns(); +} + +inline size_t BaseTablet::num_null_columns() const { + return _schema.num_null_columns(); +} + +inline size_t BaseTablet::num_key_columns() const { + return _schema.num_key_columns(); +} + +inline size_t BaseTablet::num_short_key_columns() const { + return _schema.num_short_key_columns(); +} + +inline size_t BaseTablet::num_rows_per_row_block() const { + return _schema.num_rows_per_row_block(); +} + +inline CompressKind BaseTablet::compress_kind() const { + return _schema.compress_kind(); +} + +inline double BaseTablet::bloom_filter_fpp() const { + return _schema.bloom_filter_fpp(); +} + +inline size_t BaseTablet::next_unique_id() const { + return _schema.next_column_unique_id(); +} + +inline size_t BaseTablet::field_index(const string& field_name) const { + return _schema.field_index(field_name); +} + +inline size_t BaseTablet::row_size() const { + return _schema.row_size(); +} + } /* namespace doris */ #endif /* DORIS_BE_SRC_OLAP_BASE_TABLET_H */ diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index f577f97..8cabfd9 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -334,14 +334,14 @@ OLAPStatus DataDir::get_shard(uint64_t* shard) { return OLAP_SUCCESS; } -void DataDir::register_tablet(Tablet* tablet) { +void DataDir::register_tablet(BaseTablet* tablet) { TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid()); std::lock_guard<std::mutex> l(_mutex); _tablet_set.emplace(std::move(tablet_info)); } -void DataDir::deregister_tablet(Tablet* tablet) { +void DataDir::deregister_tablet(BaseTablet* tablet) { TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid()); std::lock_guard<std::mutex> l(_mutex); diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 58992ed..f8df04a 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -32,6 +32,7 @@ namespace doris { +class BaseTablet; class Tablet; class TabletManager; class TabletMeta; @@ -80,8 +81,8 @@ public: TStorageMedium::type storage_medium() const { return _storage_medium; } - void register_tablet(Tablet* tablet); - void deregister_tablet(Tablet* tablet); + void register_tablet(BaseTablet* tablet); + void deregister_tablet(BaseTablet* tablet); void clear_tablets(std::vector<TabletInfo>* tablet_infos); std::string get_absolute_shard_path(int64_t shard_id); diff --git a/be/src/olap/memory/mem_tablet.cpp b/be/src/olap/memory/mem_tablet.cpp index 03a9d45..0e5f33b 100644 --- a/be/src/olap/memory/mem_tablet.cpp +++ b/be/src/olap/memory/mem_tablet.cpp @@ -36,11 +36,18 @@ std::shared_ptr<MemTablet> MemTablet::create_tablet_from_meta(TabletMetaSharedPt return std::make_shared<MemTablet>(tablet_meta, data_dir); } -Status MemTablet::init() { +OLAPStatus MemTablet::_init_once_action() { _max_version = 0; - return MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet); + Status ret = MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet); + if (ret.ok()) { + return OLAP_SUCCESS; + } else { + // TODO: Status/OLAPStatus compatibility + return OLAP_ERR_INIT_FAILED; + } } + Status MemTablet::scan(std::unique_ptr<ScanSpec>* spec, std::unique_ptr<MemTabletScan>* scan) { uint64_t version = (*spec)->version(); if (version == UINT64_MAX) { @@ -86,5 +93,28 @@ Status MemTablet::commit_write_txn(WriteTxn* wtxn, uint64_t version) { return Status::OK(); } +void MemTablet::build_tablet_report_info(TTabletInfo* tablet_info) { + ReadLock rdlock(&_meta_lock); + tablet_info->tablet_id = _tablet_meta->tablet_id(); + tablet_info->schema_hash = _tablet_meta->schema_hash(); + tablet_info->row_count = _tablet_meta->num_rows(); + tablet_info->data_size = _tablet_meta->tablet_footprint(); + tablet_info->version = _max_version; + tablet_info->version_hash = 0; + tablet_info->__set_partition_id(_tablet_meta->partition_id()); + tablet_info->__set_storage_medium(_data_dir->storage_medium()); + tablet_info->__set_version_count(_tablet_meta->version_count()); + tablet_info->__set_path_hash(_data_dir->path_hash()); + tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory()); + tablet_info->__set_tablet_type(_tablet_meta->tablet_type() == TabletTypePB::TABLET_TYPE_DISK ? + TTabletType::TABLET_TYPE_DISK : TTabletType::TABLET_TYPE_MEMORY); +} + +void MemTablet::delete_all_files() { + // TODO: +} + + + } // namespace memory } // namespace doris diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/mem_tablet.h index dfafa27..03c3933 100644 --- a/be/src/olap/memory/mem_tablet.h +++ b/be/src/olap/memory/mem_tablet.h @@ -27,6 +27,15 @@ class MemSubTablet; class ScanSpec; class MemTabletScan; class WriteTxn; +class MemTablet; +using MemTabletSharedPtr = std::shared_ptr<MemTablet>; + +inline MemTabletSharedPtr to_mem_tablet(const BaseTabletSharedPtr& base) { + if (base->is_memory()) { + return std::static_pointer_cast<MemTablet>(base); + } + return MemTabletSharedPtr(); +} // Tablet class for memory-optimized storage engine. // @@ -49,9 +58,6 @@ public: virtual ~MemTablet(); - // Initialize - Status init(); - // Scan the tablet, return a MemTabletScan object scan, user can specify projections // using ScanSpec, currently only support full scan with projection, will support // filter/aggregation in the future. @@ -70,6 +76,13 @@ public: // Note: commit is done sequentially, protected by internal write lock Status commit_write_txn(WriteTxn* wtxn, uint64_t version); + virtual void build_tablet_report_info(TTabletInfo* tablet_info); + + virtual void delete_all_files(); + +protected: + virtual OLAPStatus _init_once_action(); + private: friend class MemTabletScan; // memory::Schema is used internally rather than TabletSchema, so we need an extra diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 61d7ab9..165b4c2 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -907,9 +907,9 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) { vector<TabletInfo> tablet_infos; task->get_related_tablets(&tablet_infos); sort(tablet_infos.begin(), tablet_infos.end()); - vector<TabletSharedPtr> related_tablets; + vector<BaseTabletSharedPtr> related_tablets; for (TabletInfo& tablet_info : tablet_infos) { - TabletSharedPtr tablet = _tablet_manager->get_tablet( + BaseTabletSharedPtr tablet = _tablet_manager->get_base_tablet( tablet_info.tablet_id, tablet_info.schema_hash); if (tablet != nullptr) { related_tablets.push_back(tablet); @@ -921,7 +921,7 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) { } // add write lock to all related tablets OLAPStatus prepare_status = task->prepare(); - for (TabletSharedPtr& tablet : related_tablets) { + for (auto& tablet : related_tablets) { tablet->release_header_lock(); } if (prepare_status != OLAP_SUCCESS) { @@ -943,9 +943,9 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) { // related tablets may be changed after execute task, so that get them here again task->get_related_tablets(&tablet_infos); sort(tablet_infos.begin(), tablet_infos.end()); - vector<TabletSharedPtr> related_tablets; + vector<BaseTabletSharedPtr> related_tablets; for (TabletInfo& tablet_info : tablet_infos) { - TabletSharedPtr tablet = _tablet_manager->get_tablet( + auto tablet = _tablet_manager->get_base_tablet( tablet_info.tablet_id, tablet_info.schema_hash); if (tablet != nullptr) { related_tablets.push_back(tablet); @@ -957,7 +957,7 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) { } // add write lock to all related tablets OLAPStatus fin_status = task->finish(); - for (TabletSharedPtr& tablet : related_tablets) { + for (auto& tablet : related_tablets) { tablet->release_header_lock(); } return fin_status; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index e48ca02..429c37a 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -55,7 +55,6 @@ TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : BaseTablet(tablet_meta, data_dir), - _is_bad(false), _last_cumu_compaction_failure_millis(0), _last_base_compaction_failure_millis(0), _last_cumu_compaction_success_millis(0), @@ -102,20 +101,6 @@ OLAPStatus Tablet::_init_once_action() { return res; } -OLAPStatus Tablet::init() { - return _init_once.call([this] { return _init_once_action(); }); -} - -// should save tablet meta to remote meta store -// if it's a primary replica -void Tablet::save_meta() { - auto res = _tablet_meta->save_meta(_data_dir); - CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path(); - // User could directly update tablet schema by _tablet_meta, - // So we need to refetch schema again - _schema = _tablet_meta->tablet_schema(); -} - OLAPStatus Tablet::revise_tablet_meta( const vector<RowsetMetaSharedPtr>& rowsets_to_clone, const vector<Version>& versions_to_delete) { @@ -844,14 +829,6 @@ OLAPStatus Tablet::_contains_version(const Version& version) { return OLAP_SUCCESS; } -OLAPStatus Tablet::set_partition_id(int64_t partition_id) { - return _tablet_meta->set_partition_id(partition_id); -} - -TabletInfo Tablet::get_tablet_info() const { - return TabletInfo(tablet_id(), schema_hash(), tablet_uid()); -} - void Tablet::pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec, std::vector<RowsetSharedPtr>* candidate_rowsets) { int64_t now = UnixSeconds(); @@ -1045,6 +1022,8 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { tablet_info->__set_version_count(_tablet_meta->version_count()); tablet_info->__set_path_hash(_data_dir->path_hash()); tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory()); + tablet_info->__set_tablet_type(_tablet_meta->tablet_type() == TabletTypePB::TABLET_TYPE_DISK ? + TTabletType::TABLET_TYPE_DISK : TTabletType::TABLET_TYPE_MEMORY); } // should use this method to get a copy of current tablet meta diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 40cac13..e7bd244 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -46,6 +46,13 @@ class TabletMeta; using TabletSharedPtr = std::shared_ptr<Tablet>; +inline TabletSharedPtr to_tablet(const BaseTabletSharedPtr& base) { + if (base->is_memory()) { + return TabletSharedPtr(); + } + return std::static_pointer_cast<Tablet>(base); +} + class Tablet : public BaseTablet { public: static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, @@ -53,15 +60,6 @@ public: Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir); - OLAPStatus init(); - inline bool init_succeeded(); - - bool is_used(); - - void register_tablet_into_dir(); - void deregister_tablet_from_dir(); - - void save_meta(); // Used in clone task, to update local meta when finishing a clone job OLAPStatus revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone, const std::vector<Version>& versions_to_delete); @@ -69,24 +67,6 @@ public: inline const int64_t cumulative_layer_point() const; inline void set_cumulative_layer_point(int64_t new_point); - inline size_t tablet_footprint(); // disk space occupied by tablet - inline size_t num_rows(); - inline int version_count() const; - inline Version max_version() const; - - // propreties encapsulated in TabletSchema - inline KeysType keys_type() const; - inline size_t num_columns() const; - inline size_t num_null_columns() const; - inline size_t num_key_columns() const; - inline size_t num_short_key_columns() const; - inline size_t num_rows_per_row_block() const; - inline CompressKind compress_kind() const; - inline double bloom_filter_fpp() const; - inline size_t next_unique_id() const; - inline size_t row_size() const; - inline size_t field_index(const string& field_name) const; - // operation in rowsets OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true); void modify_rowsets(const vector<RowsetSharedPtr>& to_add, @@ -128,12 +108,6 @@ public: void delete_alter_task(); OLAPStatus set_alter_state(AlterTabletState state); - // meta lock - inline void obtain_header_rdlock() { _meta_lock.rdlock(); } - inline void obtain_header_wrlock() { _meta_lock.wrlock(); } - inline void release_header_lock() { _meta_lock.unlock(); } - inline RWMutex* get_header_lock_ptr() { return &_meta_lock; } - // ingest lock inline void obtain_push_lock() { _ingest_lock.lock(); } inline void release_push_lock() { _ingest_lock.unlock(); } @@ -193,15 +167,9 @@ public: int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; } void set_last_base_compaction_success_time(int64_t millis) { _last_base_compaction_success_millis = millis; } - void delete_all_files(); - bool check_path(const std::string& check_path) const; bool check_rowset_id(const RowsetId& rowset_id); - OLAPStatus set_partition_id(int64_t partition_id); - - TabletInfo get_tablet_info() const; - void pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec, std::vector<RowsetSharedPtr>* candidate_rowsets); void pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets); @@ -219,20 +187,24 @@ public: bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta); - void build_tablet_report_info(TTabletInfo* tablet_info); - void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const; // return a json string to show the compaction status of this tablet void get_compaction_status(std::string* json_result); + virtual void build_tablet_report_info(TTabletInfo* tablet_info); + + virtual void delete_all_files(); + +protected: + virtual OLAPStatus _init_once_action(); + private: - OLAPStatus _init_once_action(); void _print_missed_versions(const std::vector<Version>& missed_versions) const; bool _contains_rowset(const RowsetId rowset_id); OLAPStatus _contains_version(const Version& version); void _max_continuous_version_from_begining_unlocked(Version* version, - VersionHash* v_hash) const ; + VersionHash* v_hash) const; RowsetSharedPtr _rowset_with_largest_size(); void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash); OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>& version_path, @@ -243,7 +215,6 @@ private: RowsetGraph _rs_graph; - DorisCallOnce<OLAPStatus> _init_once; // meta store lock is used for prevent 2 threads do checkpoint concurrently // it will be used in econ-mode in the future RWMutex _meta_store_lock; @@ -252,9 +223,6 @@ private: Mutex _cumulative_lock; RWMutex _migration_lock; - // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to - // explain how these two locks work together. - mutable RWMutex _meta_lock; // A new load job will produce a new rowset, which will be inserted into both _rs_version_map // and _inc_rs_version_map. Only the most recent rowsets are kept in _inc_rs_version_map to // reduce the amount of data that needs to be copied during the clone task. @@ -268,8 +236,6 @@ private: std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _rs_version_map; std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _inc_rs_version_map; - // if this tablet is broken, set to true. default is false - std::atomic<bool> _is_bad; // timestamp of last cumu compaction failure std::atomic<int64_t> _last_cumu_compaction_failure_millis; // timestamp of last base compaction failure @@ -285,22 +251,6 @@ private: DISALLOW_COPY_AND_ASSIGN(Tablet); }; -inline bool Tablet::init_succeeded() { - return _init_once.has_called() && _init_once.stored_result() == OLAP_SUCCESS; -} - -inline bool Tablet::is_used() { - return !_is_bad && _data_dir->is_used(); -} - -inline void Tablet::register_tablet_into_dir() { - _data_dir->register_tablet(this); -} - -inline void Tablet::deregister_tablet_from_dir() { - _data_dir->deregister_tablet(this); -} - inline const int64_t Tablet::cumulative_layer_point() const { return _cumulative_point; @@ -311,72 +261,6 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) { } -// TODO(lingbin): Why other methods that need to get information from _tablet_meta -// are not locked, here needs a comment to explain. -inline size_t Tablet::tablet_footprint() { - ReadLock rdlock(&_meta_lock); - return _tablet_meta->tablet_footprint(); -} - -// TODO(lingbin): Why other methods which need to get information from _tablet_meta -// are not locked, here needs a comment to explain. -inline size_t Tablet::num_rows() { - ReadLock rdlock(&_meta_lock); - return _tablet_meta->num_rows(); -} - -inline int Tablet::version_count() const { - return _tablet_meta->version_count(); -} - -inline Version Tablet::max_version() const { - return _tablet_meta->max_version(); -} - -inline KeysType Tablet::keys_type() const { - return _schema.keys_type(); -} - -inline size_t Tablet::num_columns() const { - return _schema.num_columns(); -} - -inline size_t Tablet::num_null_columns() const { - return _schema.num_null_columns(); -} - -inline size_t Tablet::num_key_columns() const { - return _schema.num_key_columns(); -} - -inline size_t Tablet::num_short_key_columns() const { - return _schema.num_short_key_columns(); -} - -inline size_t Tablet::num_rows_per_row_block() const { - return _schema.num_rows_per_row_block(); -} - -inline CompressKind Tablet::compress_kind() const { - return _schema.compress_kind(); -} - -inline double Tablet::bloom_filter_fpp() const { - return _schema.bloom_filter_fpp(); -} - -inline size_t Tablet::next_unique_id() const { - return _schema.next_column_unique_id(); -} - -inline size_t Tablet::field_index(const string& field_name) const { - return _schema.field_index(field_name); -} - -inline size_t Tablet::row_size() const { - return _schema.row_size(); -} - } #endif // DORIS_BE_SRC_OLAP_TABLET_H diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 3c3fbe5..aafd7d9 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -41,6 +41,7 @@ #include "olap/rowset/column_data_writer.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" +#include "olap/memory/mem_tablet.h" #include "olap/schema_change.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" @@ -62,7 +63,7 @@ using strings::Substitute; namespace doris { -static bool _cmp_tablet_by_create_time(const TabletSharedPtr& a, const TabletSharedPtr& b) { +static bool _cmp_tablet_by_create_time(const BaseTabletSharedPtr& a, const BaseTabletSharedPtr& b) { return a->creation_time() < b->creation_time(); } @@ -81,15 +82,15 @@ TabletManager::~TabletManager() { } OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, + const BaseTabletSharedPtr& base_tablet, bool update_meta, bool force) { OLAPStatus res = OLAP_SUCCESS; VLOG(3) << "begin to add tablet to TabletManager. " << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash << ", force=" << force; - TabletSharedPtr existed_tablet = nullptr; + BaseTabletSharedPtr existed_tablet = nullptr; tablet_map_t& tablet_map = _get_tablet_map(tablet_id); - for (TabletSharedPtr item : tablet_map[tablet_id].table_arr) { + for (auto& item : tablet_map[tablet_id].table_arr) { if (item->equal(tablet_id, schema_hash)) { existed_tablet = item; break; @@ -98,26 +99,34 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s if (existed_tablet == nullptr) { return _add_tablet_to_map_unlocked(tablet_id, schema_hash, - tablet, update_meta, + base_tablet, update_meta, false /*keep_files*/, false /*drop_old*/); } if (!force) { - if (existed_tablet->tablet_path() == tablet->tablet_path()) { + if (existed_tablet->tablet_path() == base_tablet->tablet_path()) { LOG(WARNING) << "add the same tablet twice! tablet_id=" << tablet_id << ", schema_hash=" << schema_hash - << ", tablet_path=" << tablet->tablet_path(); + << ", tablet_path=" << base_tablet->tablet_path(); return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; } - if (existed_tablet->data_dir() == tablet->data_dir()) { + if (existed_tablet->data_dir() == base_tablet->data_dir()) { LOG(WARNING) << "add tablet with same data dir twice! tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; } } + if (base_tablet->is_memory() || existed_tablet->is_memory()) { + LOG(WARNING) << "add the same MemTablet twice! tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash + << ", tablet_path=" << base_tablet->tablet_path(); + return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; + } + + TabletSharedPtr tablet = to_tablet(base_tablet); existed_tablet->obtain_header_rdlock(); - const RowsetSharedPtr old_rowset = existed_tablet->rowset_with_max_version(); + const RowsetSharedPtr old_rowset = to_tablet(existed_tablet)->rowset_with_max_version(); const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version(); // If new tablet is empty, it is a newly created schema change tablet. @@ -163,7 +172,7 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s } OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, + const BaseTabletSharedPtr& tablet, bool update_meta, bool keep_files, bool drop_old) { // check if new tablet's meta is in store and add new tablet's meta to meta store @@ -224,7 +233,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, // tablet_id exist but with different schema_hash, return an error(report task will // eventually trigger its deletion). if (_check_tablet_id_exist_unlocked(tablet_id)) { - TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, schema_hash); + BaseTabletSharedPtr tablet = _get_base_tablet_unlocked(tablet_id, schema_hash); if (tablet != nullptr) { LOG(INFO) << "success to create tablet. tablet already exist. tablet_id=" << tablet_id; return OLAP_SUCCESS; @@ -241,6 +250,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, // If the CreateTabletReq has base_tablet_id then it is a alter-tablet request if (request.__isset.base_tablet_id && request.base_tablet_id > 0) { is_schema_change = true; + // MemTablet does not support schema change, so it's safe to use TabletSharedPtr base_tablet = _get_tablet_unlocked(request.base_tablet_id, request.base_schema_hash); if (base_tablet == nullptr) { LOG(WARNING) << "fail to create tablet(change schema), base tablet does not exist. " @@ -258,7 +268,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, } // set alter type to schema-change. it is useless - TabletSharedPtr tablet = _internal_create_tablet_unlocked( + auto tablet = _internal_create_tablet_unlocked( AlterTabletType::SCHEMA_CHANGE, request, is_schema_change, base_tablet.get(), stores); if (tablet == nullptr) { LOG(WARNING) << "fail to create tablet. tablet_id=" << request.tablet_id; @@ -271,7 +281,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, return OLAP_SUCCESS; } -TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( +BaseTabletSharedPtr TabletManager::_internal_create_tablet_unlocked( const AlterTabletType alter_type, const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, const std::vector<DataDir*>& data_dirs) { @@ -312,17 +322,22 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( // 1. !is_schema_change: not in schema-change state; // 2. request.base_tablet_id > 0: in schema-change state; if (!is_schema_change || (request.__isset.base_tablet_id && request.base_tablet_id > 0)) { - // Create init version if this is not a restore mode replica and request.version is set - // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; - // if (!in_restore_mode && request.__isset.version) { - // create inital rowset before add it to storage engine could omit many locks - res = _create_inital_rowset_unlocked(request, tablet.get()); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to create initial version for tablet. res=" << res; - break; + if (!tablet->is_memory()) { + // Create init version if this is not a restore mode replica and request.version is set + // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; + // if (!in_restore_mode && request.__isset.version) { + // create inital rowset before add it to storage engine could omit many locks + res = _create_inital_rowset_unlocked(request, to_tablet(tablet).get()); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to create initial version for tablet. res=" << res; + break; + } } } if (is_schema_change) { + if (tablet->is_memory()) { + LOG(FATAL) << "MemTablet schema change not supported"; + } if (request.__isset.base_tablet_id && request.base_tablet_id > 0) { LOG(INFO) << "request for alter-tablet v2, do not add alter task to tablet"; // if this is a new alter tablet, has to set its state to not ready @@ -331,8 +346,10 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( tablet->set_tablet_state(TabletState::TABLET_NOTREADY); } else { // add alter task to new tablet if it is a new tablet during schema change - tablet->add_alter_task(base_tablet->tablet_id(), base_tablet->schema_hash(), - vector<Version>(), alter_type); + to_tablet(tablet)->add_alter_task(base_tablet->tablet_id(), + base_tablet->schema_hash(), + vector<Version>(), + alter_type); } // 有可能出现以下2种特殊情况: // 1. 因为操作系统时间跳变,导致新生成的表的creation_time小于旧表的creation_time时间 @@ -359,7 +376,7 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( // TODO(lingbin): The following logic seems useless, can be removed? // Because if _add_tablet_unlocked() return OK, we must can get it from map. - TabletSharedPtr tablet_ptr = _get_tablet_unlocked(new_tablet_id, new_schema_hash); + BaseTabletSharedPtr tablet_ptr = _get_base_tablet_unlocked(new_tablet_id, new_schema_hash); if (tablet_ptr == nullptr) { res = OLAP_ERR_TABLE_NOT_FOUND; LOG(WARNING) << "fail to get tablet. res=" << res; @@ -391,7 +408,7 @@ static string _gen_tablet_dir(const string& dir, int16_t shard_id, int64_t table return path; } -TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( +BaseTabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, const std::vector<DataDir*>& data_dirs) { string pending_id = StrCat(TABLET_ID_PREFIX, request.tablet_id); @@ -427,16 +444,25 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( } else { data_dir->add_pending_ids(pending_id); Status st = FileUtils::create_dir(schema_hash_dir); - if(!st.ok()) { + if (!st.ok()) { LOG(WARNING) << "create dir fail. path=" << schema_hash_dir << " error=" << st.to_string(); continue; } } - TabletSharedPtr new_tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir); - DCHECK(new_tablet != nullptr); - return new_tablet; + TTabletType::type ttype = request.__isset.tablet_type ? + request.tablet_type : TTabletType::TABLET_TYPE_DISK; + if (ttype == TTabletType::TABLET_TYPE_DISK) { + TabletSharedPtr new_tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir); + DCHECK(new_tablet != nullptr); + return std::static_pointer_cast<BaseTablet>(new_tablet); + } else { + memory::MemTabletSharedPtr new_tablet = memory::MemTablet::create_tablet_from_meta( + tablet_meta, data_dir); + DCHECK(new_tablet != nullptr); + return std::static_pointer_cast<BaseTablet>(new_tablet); + } } return nullptr; } @@ -462,13 +488,18 @@ OLAPStatus TabletManager::_drop_tablet_unlocked( DorisMetrics::instance()->drop_tablet_requests_total.increment(1); // Fetch tablet which need to be droped - TabletSharedPtr to_drop_tablet = _get_tablet_unlocked(tablet_id, schema_hash); - if (to_drop_tablet == nullptr) { + BaseTabletSharedPtr to_drop_base_tablet = _get_base_tablet_unlocked(tablet_id, schema_hash); + if (to_drop_base_tablet == nullptr) { LOG(WARNING) << "fail to drop tablet because it does not exist. " << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; return OLAP_SUCCESS; } + if (to_drop_base_tablet->is_memory()) { + return _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files); + } + TabletSharedPtr to_drop_tablet = to_tablet(to_drop_base_tablet); + // Try to get schema change info, we can drop tablet directly if it is not // in schema-change state. AlterTabletTaskSharedPtr alter_task = to_drop_tablet->alter_task(); @@ -562,7 +593,7 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path( continue; } else { tablet_map_t& tablet_map = _get_tablet_map(tablet_id); - for (list<TabletSharedPtr>::iterator it = tablet_map[tablet_id].table_arr.begin(); + for (auto it = tablet_map[tablet_id].table_arr.begin(); it != tablet_map[tablet_id].table_arr.end();) { if ((*it)->equal(tablet_id, schema_hash)) { // We should first remove tablet from partition_map to avoid iterator @@ -579,6 +610,13 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path( return res; } +BaseTabletSharedPtr TabletManager::get_base_tablet(TTabletId tablet_id, SchemaHash schema_hash, + bool include_deleted, std::string* err) { + RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id); + ReadLock rlock(&tablet_map_lock); + return _get_base_tablet_unlocked(tablet_id, schema_hash, include_deleted, err); +} + TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema_hash, bool include_deleted, string* err) { RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id); @@ -588,8 +626,24 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, bool include_deleted, string* err) { - TabletSharedPtr tablet; - tablet = _get_tablet_unlocked(tablet_id, schema_hash); + BaseTabletSharedPtr ret = _get_base_tablet_unlocked(tablet_id, schema_hash, include_deleted, + err); + if (ret == nullptr) { + return TabletSharedPtr(); + } + if (ret->is_memory()) { + LOG(FATAL) << "_get_tablet_unlocked get MemTablet"; + return TabletSharedPtr(); + } + return to_tablet(ret); +} + +BaseTabletSharedPtr TabletManager::_get_base_tablet_unlocked(TTabletId tablet_id, + SchemaHash schema_hash, + bool include_deleted, + string* err) { + BaseTabletSharedPtr tablet; + tablet = _get_base_tablet_unlocked(tablet_id, schema_hash); if (tablet == nullptr && include_deleted) { ReadLock rlock(&_shutdown_tablets_lock); for (auto& deleted_tablet : _shutdown_tablets) { @@ -687,7 +741,12 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com ReadLock tablet_map_rdlock(&_tablet_map_lock_array[i]); tablet_map_t& tablet_map = _tablet_map_array[i]; for (tablet_map_t::value_type& table_ins : tablet_map){ - for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) { + for (BaseTabletSharedPtr& base_tablet_ptr : table_ins.second.table_arr) { + if (base_tablet_ptr->is_memory()) { + // TODO: mem_tablet doesn't do compaction yet + continue; + } + TabletSharedPtr tablet_ptr = to_tablet(base_tablet_ptr); AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task(); if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED @@ -771,7 +830,8 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com } OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, - TSchemaHash schema_hash, const string& meta_binary, bool update_meta, bool force, bool restore) { + TSchemaHash schema_hash, const string& meta_binary, + bool update_meta, bool force, bool restore) { RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id); WriteLock wlock(&tablet_map_lock); TabletMetaSharedPtr tablet_meta(new TabletMeta()); @@ -805,37 +865,77 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab tablet_meta->set_tablet_state(TABLET_RUNNING); } - TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir); - if (tablet == nullptr) { - LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id - << ", schema_hash:" << schema_hash; - return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; - } + if (tablet_meta->tablet_type() == TabletTypePB::TABLET_TYPE_DISK) { + TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir); + if (tablet == nullptr) { + LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id + << ", schema_hash:" << schema_hash; + return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; + } + + if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) { + LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id + << " schema_hash=" << schema_hash << ", path=" << data_dir->path(); + { + WriteLock shutdown_tablets_wlock(&_shutdown_tablets_lock); + _shutdown_tablets.push_back(tablet); + } + return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR; + } + // NOTE: We do not check tablet's initial version here, because if BE restarts when + // one tablet is doing schema-change, we may meet empty tablet. + if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { + LOG(WARNING) << "fail to load tablet. it is in running state but without delta. " + << "tablet=" << tablet->full_name() << ", path=" << data_dir->path(); + // tablet state is invalid, drop tablet + return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR; + } - if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) { - LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id - << " schema_hash=" << schema_hash << ", path=" << data_dir->path(); - { - WriteLock shutdown_tablets_wlock(&_shutdown_tablets_lock); - _shutdown_tablets.push_back(tablet); + RETURN_NOT_OK_LOG(tablet->init(), Substitute("tablet init failed. tablet=$0", + tablet->full_name())); + RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, schema_hash, + std::static_pointer_cast<BaseTablet>(tablet), + update_meta, force), + Substitute("fail to add tablet. tablet=$0", tablet->full_name())); + + return OLAP_SUCCESS; + } else { + memory::MemTabletSharedPtr tablet = memory::MemTablet::create_tablet_from_meta( + tablet_meta, data_dir); + if (tablet == nullptr) { + LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id + << ", schema_hash:" << schema_hash; + return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; } - return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR; - } - // NOTE: We do not check tablet's initial version here, because if BE restarts when - // one tablet is doing schema-change, we may meet empty tablet. - if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { - LOG(WARNING) << "fail to load tablet. it is in running state but without delta. " - << "tablet=" << tablet->full_name() << ", path=" << data_dir->path(); - // tablet state is invalid, drop tablet - return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR; - } - RETURN_NOT_OK_LOG(tablet->init(), Substitute("tablet init failed. tablet=$0", - tablet->full_name())); - RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, schema_hash, tablet, update_meta, force), - Substitute("fail to add tablet. tablet=$0", tablet->full_name())); + if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) { + LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id + << " schema_hash=" << schema_hash << ", path=" << data_dir->path(); + { + WriteLock shutdown_tablets_wlock(&_shutdown_tablets_lock); + _shutdown_tablets.push_back(tablet); + } + return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR; + } + // NOTE: We do not check tablet's initial version here, because if BE restarts when + // one tablet is doing schema-change, we may meet empty tablet. + if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { + LOG(WARNING) << "fail to load tablet. it is in running state but without delta. " + << "tablet=" << tablet->full_name() << ", path=" << data_dir->path(); + // tablet state is invalid, drop tablet + return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR; + } + + RETURN_NOT_OK_LOG(tablet->init(), Substitute("tablet init failed. tablet=$0", + tablet->full_name())); + RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, schema_hash, + std::static_pointer_cast<BaseTablet>(tablet), + update_meta, force), + Substitute("fail to add tablet. tablet=$0", tablet->full_name())); + + return OLAP_SUCCESS; + } - return OLAP_SUCCESS; } OLAPStatus TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, @@ -905,7 +1005,7 @@ OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) { OLAPStatus res = OLAP_SUCCESS; - TabletSharedPtr tablet = get_tablet(tablet_info->tablet_id, tablet_info->schema_hash); + auto tablet = get_base_tablet(tablet_info->tablet_id, tablet_info->schema_hash); if (tablet == nullptr) { LOG(WARNING) << "can't find tablet. " << " tablet=" << tablet_info->tablet_id << " schema_hash=" << tablet_info->schema_hash; @@ -938,7 +1038,7 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>* uint64_t tablet_id = item.first; TTablet t_tablet; - for (TabletSharedPtr tablet_ptr : item.second.table_arr) { + for (const auto& tablet_ptr : item.second.table_arr) { TTabletInfo tablet_info; tablet_ptr->build_tablet_report_info(&tablet_info); @@ -964,7 +1064,8 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>* OLAPStatus TabletManager::start_trash_sweep() { { std::vector<int64_t> tablets_to_clean; - std::vector<TabletSharedPtr> all_tablets; // we use this vector to save all tablet ptr for saving lock time. + // we use this vector to save all tablet ptr for saving lock time. + std::vector<TabletSharedPtr> all_tablets; for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) { tablet_map_t& tablet_map = _tablet_map_array[i]; { @@ -974,8 +1075,11 @@ OLAPStatus TabletManager::start_trash_sweep() { if (item.second.table_arr.empty()) { tablets_to_clean.push_back(item.first); } - for (TabletSharedPtr tablet : item.second.table_arr) { - all_tablets.push_back(tablet); + for (BaseTabletSharedPtr& base_tablet : item.second.table_arr) { + // TODO: support MemTablet + if (!base_tablet->is_memory()) { + all_tablets.push_back(to_tablet(base_tablet)); + } } } } @@ -1103,7 +1207,7 @@ bool TabletManager::try_schema_change_lock(TTabletId tablet_id) { void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_map, size_t* tablet_count) { - DCHECK(tablet_count != 0); + DCHECK(tablet_count != nullptr); *tablet_count = 0; for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) { ReadLock rlock(&_tablet_map_lock_array[i]); @@ -1138,7 +1242,7 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { for (int32 i = 0 ; i < _tablet_map_lock_shard_size; i++) { ReadLock tablet_map_rdlock(&_tablet_map_lock_array[i]); for (tablet_map_t::value_type& table_ins : _tablet_map_array[i]){ - for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) { + for (BaseTabletSharedPtr& tablet_ptr : table_ins.second.table_arr) { if (tablet_ptr->tablet_state() != TABLET_RUNNING) { continue; } @@ -1148,7 +1252,10 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { || !tablet_ptr->init_succeeded()) { continue; } - related_tablets.push_back(tablet_ptr); + // TODO: do MemTablet need checkpoint? + if (!tablet_ptr->is_memory()) { + related_tablets.push_back(to_tablet(tablet_ptr)); + } } } } @@ -1170,7 +1277,7 @@ void TabletManager::_build_tablet_stat() { TTabletStat stat; stat.tablet_id = item.first; - for (TabletSharedPtr tablet : item.second.table_arr) { + for (BaseTabletSharedPtr tablet : item.second.table_arr) { // TODO(lingbin): if it is nullptr, why is it not deleted? if (tablet == nullptr) { continue; @@ -1315,7 +1422,7 @@ OLAPStatus TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& r OLAPStatus TabletManager::_drop_tablet_directly_unlocked( TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) { - TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id, schema_hash); + BaseTabletSharedPtr dropped_tablet = _get_base_tablet_unlocked(tablet_id, schema_hash); if (dropped_tablet == nullptr) { LOG(WARNING) << "fail to drop tablet because it does not exist. " << " tablet_id=" << tablet_id @@ -1323,16 +1430,16 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked( return OLAP_ERR_TABLE_NOT_FOUND; } tablet_map_t& tablet_map = _get_tablet_map(tablet_id); - list<TabletSharedPtr>& candidate_tablets = tablet_map[tablet_id].table_arr; - list<TabletSharedPtr>::iterator it = candidate_tablets.begin(); + auto& candidate_tablets = tablet_map[tablet_id].table_arr; + auto it = candidate_tablets.begin(); while (it != candidate_tablets.end()) { if (!(*it)->equal(tablet_id, schema_hash)) { ++it; continue; } - TabletSharedPtr tablet = *it; - _remove_tablet_from_partition(*(*it)); + auto tablet = *it; + _remove_tablet_from_partition(*tablet); it = candidate_tablets.erase(it); if (!keep_files) { // drop tablet will update tablet meta, should lock @@ -1359,12 +1466,13 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked( return OLAP_SUCCESS; } -TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash) { +BaseTabletSharedPtr TabletManager::_get_base_tablet_unlocked(TTabletId tablet_id, + SchemaHash schema_hash) { VLOG(3) << "begin to get tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map_t::iterator it = tablet_map.find(tablet_id); if (it != tablet_map.end()) { - for (TabletSharedPtr tablet : it->second.table_arr) { + for (BaseTabletSharedPtr tablet : it->second.table_arr) { CHECK(tablet != nullptr) << "tablet is nullptr. tablet_id=" << tablet_id; if (tablet->equal(tablet_id, schema_hash)) { VLOG(3) << "get tablet success. tablet_id=" << tablet_id @@ -1376,16 +1484,28 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaH VLOG(3) << "fail to get tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; // Return nullptr tablet if fail - TabletSharedPtr tablet; - return tablet; + return nullptr; +} + +TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash) { + BaseTabletSharedPtr ret = _get_base_tablet_unlocked(tablet_id, schema_hash); + if (ret == nullptr) { + return TabletSharedPtr(); + } + if (ret->is_memory()) { + LOG(FATAL) << "fail to get TabletSharedPtr from MemTabletSharedPtr. tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash; + return TabletSharedPtr(); + } + return to_tablet(ret); } -void TabletManager::_add_tablet_to_partition(const Tablet& tablet) { +void TabletManager::_add_tablet_to_partition(const BaseTablet& tablet) { WriteLock wlock(&_partition_tablet_map_lock); _partition_tablet_map[tablet.partition_id()].insert(tablet.get_tablet_info()); } -void TabletManager::_remove_tablet_from_partition(const Tablet& tablet) { +void TabletManager::_remove_tablet_from_partition(const BaseTablet& tablet) { WriteLock wlock(&_partition_tablet_map_lock); _partition_tablet_map[tablet.partition_id()].erase(tablet.get_tablet_info()); if (_partition_tablet_map[tablet.partition_id()].empty()) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 347fb10..745d7d6 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -71,6 +71,9 @@ public: TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type, DataDir* data_dir); + BaseTabletSharedPtr get_base_tablet(TTabletId tablet_id, SchemaHash schema_hash, + bool include_deleted = false, std::string* err = nullptr); + TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash, bool include_deleted = false, std::string* err = nullptr); @@ -140,10 +143,10 @@ private: // OLAP_ERR_TABLE_INSERT_DUPLICATION_ERROR, if find duplication // OLAP_ERR_NOT_INITED, if not inited OLAPStatus _add_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, bool update_meta, bool force); + const BaseTabletSharedPtr& tablet, bool update_meta, bool force); OLAPStatus _add_tablet_to_map_unlocked(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, bool update_meta, + const BaseTabletSharedPtr& tablet, bool update_meta, bool keep_files, bool drop_old); bool _check_tablet_id_exist_unlocked(TTabletId tablet_id); @@ -156,16 +159,19 @@ private: OLAPStatus _drop_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, bool keep_files); + BaseTabletSharedPtr _get_base_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash); + BaseTabletSharedPtr _get_base_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, + bool include_deleted, std::string* err); TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash); TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, bool include_deleted, std::string* err); - TabletSharedPtr _internal_create_tablet_unlocked(const AlterTabletType alter_type, - const TCreateTabletReq& request, - const bool is_schema_change, - const Tablet* base_tablet, - const std::vector<DataDir*>& data_dirs); - TabletSharedPtr _create_tablet_meta_and_dir_unlocked(const TCreateTabletReq& request, + BaseTabletSharedPtr _internal_create_tablet_unlocked(const AlterTabletType alter_type, + const TCreateTabletReq& request, + const bool is_schema_change, + const Tablet* base_tablet, + const std::vector<DataDir*>& data_dirs); + BaseTabletSharedPtr _create_tablet_meta_and_dir_unlocked(const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, const std::vector<DataDir*>& data_dirs); @@ -177,9 +183,9 @@ private: void _build_tablet_stat(); - void _add_tablet_to_partition(const Tablet& tablet); + void _add_tablet_to_partition(const BaseTablet& tablet); - void _remove_tablet_from_partition(const Tablet& tablet); + void _remove_tablet_from_partition(const BaseTablet& tablet); inline RWMutex& _get_tablet_map_lock(TTabletId tabletId); @@ -193,7 +199,7 @@ private: // The first element(i.e. tablet_arr[0]) is the base tablet. When we add new tablet // to tablet_arr, we will sort all the elements in create-time ascending order, // which will ensure the first one is base-tablet - std::list<TabletSharedPtr> table_arr; + std::list<BaseTabletSharedPtr> table_arr; }; // tablet_id -> TabletInstances typedef std::unordered_map<int64_t, TableInstances> tablet_map_t; @@ -209,7 +215,7 @@ private: RWMutex _shutdown_tablets_lock; // partition_id => tablet_info std::map<int64_t, std::set<TabletInfo>> _partition_tablet_map; - std::vector<TabletSharedPtr> _shutdown_tablets; + std::vector<BaseTabletSharedPtr> _shutdown_tablets; std::mutex _tablet_stat_mutex; // cache to save tablets' statistics, such as data-size and row-count diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 4b9a45f..a4ef612 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -91,7 +91,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, tablet_meta_pb.set_tablet_state(PB_RUNNING); *(tablet_meta_pb.mutable_tablet_uid()) = tablet_uid.to_proto(); tablet_meta_pb.set_tablet_type(tabletType == TTabletType::TABLET_TYPE_MEMORY ? - TabletTypePB::TABLET_TYPE_DISK : TabletTypePB::TABLET_TYPE_MEMORY); + TabletTypePB::TABLET_TYPE_MEMORY : TabletTypePB::TABLET_TYPE_DISK); TabletSchemaPB* schema = tablet_meta_pb.mutable_schema(); schema->set_num_short_key_columns(tablet_schema.short_key_column_count); schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block); diff --git a/be/test/olap/memory/mem_tablet_test.cpp b/be/test/olap/memory/mem_tablet_test.cpp index 979f462..c0c6f9b 100644 --- a/be/test/olap/memory/mem_tablet_test.cpp +++ b/be/test/olap/memory/mem_tablet_test.cpp @@ -70,7 +70,7 @@ TEST(MemTablet, writescan) { new TabletMeta(1, 1, 1, 1, 1, tschema, static_cast<uint32_t>(sc->cid_size()), col_idx_to_unique_id, TabletUid(1, 1), TTabletType::TABLET_TYPE_MEMORY)); std::shared_ptr<MemTablet> tablet = MemTablet::create_tablet_from_meta(tablet_meta, nullptr); - ASSERT_TRUE(tablet->init().ok()); + ASSERT_EQ(tablet->init(), OLAP_SUCCESS); uint64_t cur_version = 0; vector<TData> alldata(num_insert); diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index ded383b..345b6ce 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -38,6 +38,7 @@ struct TTabletInfo { 12: optional bool used 13: optional Types.TPartitionId partition_id 14: optional bool is_in_memory + 15: optional AgentService.TTabletType tablet_type } struct TFinishTaskRequest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org