xiaokang commented on code in PR #19237:
URL: https://github.com/apache/doris/pull/19237#discussion_r1182204399


##########
gensrc/thrift/Types.thrift:
##########
@@ -606,6 +606,8 @@ struct TBackend {
     1: required string host
     2: required TPort be_port
     3: required TPort http_port
+    4: optional TPort brpc_port
+    5: optional TReplicaId replica_id

Review Comment:
   should not add replica_id to TBackend, create a new struct TReplicaInfo 
instead.



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -823,6 +823,16 @@ struct TCheckAuthResult {
     1: required Status.TStatus status
 }
 
+struct TGetTabletReplicasInfoRequest {

Review Comment:
   ReplicaInfos



##########
be/src/common/config.h:
##########
@@ -333,6 +334,10 @@ CONF_mInt64(total_permits_for_compaction_score, "10000");
 // sleep interval in ms after generated compaction tasks
 CONF_mInt32(generate_compaction_tasks_interval_ms, "10");
 
+CONF_mInt32(tablet_replicas_info_update_interval_seconds, "2");
+CONF_mBool(enable_single_replica_compaction, "false");

Review Comment:
   make the option on table level further



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java:
##########
@@ -523,13 +527,13 @@ public boolean equals(Object obj) {
 
         Replica replica = (Replica) obj;
         return (id == replica.id)
-                && (backendId == replica.backendId)
-                && (version == replica.version)
-                && (dataSize == replica.dataSize)
-                && (rowCount == replica.rowCount)
-                && (state.equals(replica.state))
-                && (lastFailedVersion == replica.lastFailedVersion)
-                && (lastSuccessVersion == replica.lastSuccessVersion);
+            && (backendId == replica.backendId)

Review Comment:
   just indent difference?



##########
gensrc/proto/internal_service.proto:
##########
@@ -614,6 +614,21 @@ message PFetchColIdsResponse {
     repeated PFetchColIdsResultEntry entries = 2;
 };
 
+message PGetTabletVersionsRequest {
+    required int64 tablet_id = 1;
+};
+
+enum PVersionStatus {
+    Version_OK = 0;
+    Version_NONE = 1;
+};
+
+message PGetTabletVersionsResponse {
+    required PStatus status = 1;
+    required PVersionStatus version_status = 2;

Review Comment:
   If status is enough, version_status is not necessary.



##########
be/src/olap/olap_server.cpp:
##########
@@ -665,6 +888,17 @@ void 
StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
 
 Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                                               CompactionType compaction_type) {
+    bool has_master_tablet = false;
+    {
+        std::unique_lock<std::mutex> lock(_tablet_master_info_mutex);
+        if(_tablet_master_info.count(tablet->tablet_id())) {

Review Comment:
   Is it enough to just check tablet_id but not replica_id to skip compaction 
for a tablet?



##########
be/src/olap/olap_server.cpp:
##########
@@ -519,6 +563,27 @@ void StorageEngine::_compaction_tasks_producer_callback() {
             /// If it is not cleaned up, the reference count of the tablet 
will always be greater than 1,
             /// thus cannot be collected by the garbage collector. 
(TabletManager::start_trash_sweep)
             for (const auto& tablet : tablets_compaction) {
+                if (config::enable_single_replica_compaction) {
+                    bool has_master_tablet = false;
+                    {
+                        std::unique_lock<std::mutex> 
lock(_tablet_master_info_mutex);
+                        if(_tablet_master_info.count(tablet->tablet_id())) {

Review Comment:
   Is it enough to just check tablet_id but not replica_id to skip compaction 
for a tablet?



##########
be/src/olap/olap_server.cpp:
##########
@@ -532,6 +597,164 @@ void StorageEngine::_compaction_tasks_producer_callback() 
{
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
 }
 
+void StorageEngine::_tablet_replicas_info_update_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start tablet replicas info update process!";
+    
+    int64_t interval = config::tablet_replicas_info_update_interval_seconds;
+    do {
+        if (config::enable_single_replica_compaction) {
+            auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
+                return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
+                    
!t->tablet_meta()->tablet_schema()->disable_auto_compaction();
+            });
+            TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
+            if (master_info == nullptr) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            TNetworkAddress master_addr = master_info->network_address;
+            if (master_addr.hostname == "" || master_addr.port == 0) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            
+            int start = 0;
+            int tablet_size = all_tablets.size();
+            while (start < tablet_size) {
+                int batch_size = std::min(100, tablet_size - start);
+                int end = start + batch_size;
+                TGetTabletReplicasInfoRequest request;
+                TGetTabletReplicasInfoResult result;
+                for (int i = start; i < end; i++) {
+                    
request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
+                }
+                Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+                    master_addr.hostname, master_addr.port,
+                    [&request, &result](FrontendServiceConnection& client) {
+                        client->getTabletReplicasInfo(result, request);
+                    });
+
+                if (!rpc_st.ok()) {
+                    LOG(WARNING)<< "Failed to get tablet replicas info, 
encounter rpc failure, tablet start: "
+                                << start << " end: " << end;
+                    start = end;
+                    continue;
+                }
+
+                std::unordered_map<int64_t, TBackend> tablet_master;
+                for (const auto& it : result.tablet_replicas_info) {
+                    auto tablet_id = it.first;
+                    std::vector<TBackend> backends;
+                    for (const auto& backend : it.second) {
+                        backends.emplace_back(backend);
+                    }
+                    TBackend master;
+                    auto tablet = _tablet_manager->get_tablet(tablet_id);
+                    if (tablet == nullptr) {
+                        VLOG_CRITICAL << "tablet is nullptr";
+                        continue;
+                    }
+                    auto hasMaster = tablet->calc_master_info(backends, 
master);
+                    if (hasMaster) {
+                        tablet_master[tablet_id] = master;
+                    } 
+                }
+                VLOG_CRITICAL << "get tablet replicas info from fe, size is " 
<< end - start 
+                           << " token=" << result.token;
+
+                // update _tablet_replicas_info
+                {
+                    std::unique_lock<std::mutex> 
lock(_tablet_master_info_mutex);
+                    for(const auto& it : tablet_master) {
+                        VLOG_CRITICAL << "tablet " << it.first << " , master 
is " << it.second.host;
+                        _tablet_master_info[it.first] = it.second;
+                    }
+                    _token = result.token;
+                }
+
+                start = end;
+            }
+            interval = config::tablet_replicas_info_update_interval_seconds;
+        } else {
+            interval = 15; // 15s to check enable_single_replica_compaction
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
+}
+
+Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr 
tablet) {
+    
+    bool already_exist = _push_tablet_into_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+    if (already_exist) {
+        return Status::AlreadyExist(
+                "compaction task has already been submitted, tablet_id={}", 
tablet->tablet_id());
+    }
+    
+    already_exist = _push_tablet_into_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
+    if (already_exist) {
+        _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+        return Status::AlreadyExist(
+                "compaction task has already been submitted, tablet_id={}", 
tablet->tablet_id());
+    }
+    
+    VLOG_CRITICAL << "submit single replica compaction!";
+
+    Status st = tablet->prepare_single_replica_compaction(tablet);
+    
+    if(st.ok()) {
+        auto st = _single_replica_compaction_thread_pool->submit_func([tablet, 
this](){
+            tablet->execute_single_replica_compaction();
+            tablet->reset_single_replica_compaction();
+            _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+            _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
+        });
+        if (!st.ok()) {
+            tablet->reset_single_replica_compaction();
+            _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+            _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
+            return Status::InternalError(
+                    "failed to submit single replica compaction task to thread 
pool, "
+                    "tablet_id={} ", tablet->tablet_id());
+        }
+        return Status::OK();
+    } else {
+        tablet->reset_single_replica_compaction();
+        _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+        _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
+        if (!st.ok()) {
+            return Status::InternalError(
+                    "failed to prepare single replica compaction task 
tablet_id={} ", tablet->tablet_id());
+        }
+        return st;
+    }
+}
+
+void StorageEngine::get_tablet_versions(const PGetTabletVersionsRequest* 
request, PGetTabletVersionsResponse* response) {
+    TabletSharedPtr tablet = _tablet_manager->get_tablet(request->tablet_id());
+    if (tablet == nullptr) {
+        response->set_version_status(PVersionStatus::Version_NONE);
+        response->mutable_status()->set_status_code(0);
+        return;
+    }
+    std::vector<Version> local_versions = tablet->get_all_versions();
+    
+    if(local_versions.empty()) {
+        response->set_version_status(PVersionStatus::Version_NONE);
+        response->mutable_status()->set_status_code(0);
+        return;
+    }
+
+    for(const auto& version : local_versions) {
+        response->add_versions(version.first);

Review Comment:
   it's tricky.



##########
be/src/olap/olap_server.cpp:
##########
@@ -532,6 +597,164 @@ void StorageEngine::_compaction_tasks_producer_callback() 
{
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
 }
 
+void StorageEngine::_tablet_replicas_info_update_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start tablet replicas info update process!";
+    
+    int64_t interval = config::tablet_replicas_info_update_interval_seconds;
+    do {
+        if (config::enable_single_replica_compaction) {
+            auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
+                return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
+                    
!t->tablet_meta()->tablet_schema()->disable_auto_compaction();
+            });
+            TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
+            if (master_info == nullptr) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            TNetworkAddress master_addr = master_info->network_address;
+            if (master_addr.hostname == "" || master_addr.port == 0) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            
+            int start = 0;
+            int tablet_size = all_tablets.size();
+            while (start < tablet_size) {
+                int batch_size = std::min(100, tablet_size - start);
+                int end = start + batch_size;
+                TGetTabletReplicasInfoRequest request;
+                TGetTabletReplicasInfoResult result;
+                for (int i = start; i < end; i++) {
+                    
request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
+                }
+                Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+                    master_addr.hostname, master_addr.port,
+                    [&request, &result](FrontendServiceConnection& client) {
+                        client->getTabletReplicasInfo(result, request);
+                    });
+
+                if (!rpc_st.ok()) {
+                    LOG(WARNING)<< "Failed to get tablet replicas info, 
encounter rpc failure, tablet start: "
+                                << start << " end: " << end;
+                    start = end;
+                    continue;
+                }
+
+                std::unordered_map<int64_t, TBackend> tablet_master;
+                for (const auto& it : result.tablet_replicas_info) {
+                    auto tablet_id = it.first;
+                    std::vector<TBackend> backends;
+                    for (const auto& backend : it.second) {
+                        backends.emplace_back(backend);
+                    }
+                    TBackend master;
+                    auto tablet = _tablet_manager->get_tablet(tablet_id);
+                    if (tablet == nullptr) {
+                        VLOG_CRITICAL << "tablet is nullptr";
+                        continue;
+                    }
+                    auto hasMaster = tablet->calc_master_info(backends, 
master);

Review Comment:
   It's not clear to understand.



##########
be/src/olap/tablet.cpp:
##########
@@ -1270,6 +1342,71 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_cumulative_compac
     return candidate_rowsets;
 }
 
+std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_single_replica_compaction() {
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    {
+        std::shared_lock rlock(_meta_lock);
+        for (const auto& [version, rs] : _rs_version_map) {
+            if (rs->is_local()) {
+                candidate_rowsets.push_back(rs);
+            }
+        }
+    }
+    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
Rowset::comparator);
+    return candidate_rowsets;
+}
+
+Status Tablet::get_peer_versions(std::vector<Version>& peer_versions) {
+    TBackend addr;
+    std::string token;
+    if(!StorageEngine::instance()->get_tbackend(tablet_id(), addr, token)) {
+        LOG(INFO) << tablet_id() <<  " tablet don't have master peer";
+        return Status::Aborted("no master peer");;
+    }
+    
+    PGetTabletVersionsRequest request;
+    request.set_tablet_id(tablet_id());
+    PGetTabletVersionsResponse response;
+    std::shared_ptr<PBackendService_Stub> stub =

Review Comment:
   Is there any helper class for be rpc?



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -823,6 +823,16 @@ struct TCheckAuthResult {
     1: required Status.TStatus status
 }
 
+struct TGetTabletReplicasInfoRequest {
+    1: required list<i64> tablet_ids
+}
+
+struct TGetTabletReplicasInfoResult {
+    1: optional Status.TStatus status
+    2: optional map<i64, list<Types.TBackend>> tablet_replicas_info
+    3: optional string token

Review Comment:
   is token necessary?



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -823,6 +823,16 @@ struct TCheckAuthResult {
     1: required Status.TStatus status
 }
 
+struct TGetTabletReplicasInfoRequest {
+    1: required list<i64> tablet_ids
+}
+
+struct TGetTabletReplicasInfoResult {
+    1: optional Status.TStatus status
+    2: optional map<i64, list<Types.TBackend>> tablet_replicas_info

Review Comment:
   tablet_replica_infos



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -823,6 +823,16 @@ struct TCheckAuthResult {
     1: required Status.TStatus status
 }
 
+struct TGetTabletReplicasInfoRequest {
+    1: required list<i64> tablet_ids
+}
+
+struct TGetTabletReplicasInfoResult {

Review Comment:
   ReplicaInfos



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -1549,5 +1553,40 @@ private PrivPredicate getPrivPredicate(TPrivilegeType 
privType) {
                 return null;
         }
     }
+
+    public TGetTabletReplicasInfoResult 
getTabletReplicasInfo(TGetTabletReplicasInfoRequest request) {
+        String clientAddr = getClientAddrAsString();
+        LOG.info("receive get replicas request: {}, backend: {}", request, 
clientAddr);
+        TGetTabletReplicasInfoResult result = new 
TGetTabletReplicasInfoResult();
+        result.setStatus(new TStatus());
+        List<Long> tabletIds = request.getTabletIds();
+        Map<Long, List<TBackend>> tabletReplicasInfo = Maps.newHashMap();
+        for (Long tabletId : tabletIds) {
+            List<TBackend> backends = Lists.newArrayList();
+            List<Replica> replicas = 
Env.getCurrentEnv().getCurrentInvertedIndex()
+                    .getReplicasByTabletId(tabletId);
+            for (Replica replica : replicas) {
+                if (!replica.isNormal()) {
+                    LOG.info("replica {} not normal");
+                    continue;
+                }
+                Backend backend = 
Env.getCurrentEnv().getCurrentSystemInfo().getBackend(replica.getBackendId());
+                if (backend != null && !clientAddr.equals(backend.getIp())) {
+                    TBackend tback = new TBackend();

Review Comment:
   use a new struct TReplicaInfo instead



##########
be/src/olap/olap_server.cpp:
##########
@@ -532,6 +597,164 @@ void StorageEngine::_compaction_tasks_producer_callback() 
{
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
 }
 
+void StorageEngine::_tablet_replicas_info_update_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start tablet replicas info update process!";
+    
+    int64_t interval = config::tablet_replicas_info_update_interval_seconds;
+    do {
+        if (config::enable_single_replica_compaction) {
+            auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
+                return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
+                    
!t->tablet_meta()->tablet_schema()->disable_auto_compaction();
+            });
+            TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
+            if (master_info == nullptr) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            TNetworkAddress master_addr = master_info->network_address;
+            if (master_addr.hostname == "" || master_addr.port == 0) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            
+            int start = 0;
+            int tablet_size = all_tablets.size();
+            while (start < tablet_size) {
+                int batch_size = std::min(100, tablet_size - start);
+                int end = start + batch_size;
+                TGetTabletReplicasInfoRequest request;
+                TGetTabletReplicasInfoResult result;
+                for (int i = start; i < end; i++) {
+                    
request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
+                }
+                Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+                    master_addr.hostname, master_addr.port,
+                    [&request, &result](FrontendServiceConnection& client) {
+                        client->getTabletReplicasInfo(result, request);
+                    });
+
+                if (!rpc_st.ok()) {
+                    LOG(WARNING)<< "Failed to get tablet replicas info, 
encounter rpc failure, tablet start: "
+                                << start << " end: " << end;
+                    start = end;
+                    continue;
+                }
+
+                std::unordered_map<int64_t, TBackend> tablet_master;
+                for (const auto& it : result.tablet_replicas_info) {
+                    auto tablet_id = it.first;
+                    std::vector<TBackend> backends;
+                    for (const auto& backend : it.second) {
+                        backends.emplace_back(backend);
+                    }
+                    TBackend master;
+                    auto tablet = _tablet_manager->get_tablet(tablet_id);
+                    if (tablet == nullptr) {
+                        VLOG_CRITICAL << "tablet is nullptr";
+                        continue;
+                    }
+                    auto hasMaster = tablet->calc_master_info(backends, 
master);
+                    if (hasMaster) {
+                        tablet_master[tablet_id] = master;
+                    } 
+                }
+                VLOG_CRITICAL << "get tablet replicas info from fe, size is " 
<< end - start 
+                           << " token=" << result.token;
+
+                // update _tablet_replicas_info
+                {
+                    std::unique_lock<std::mutex> 
lock(_tablet_master_info_mutex);
+                    for(const auto& it : tablet_master) {
+                        VLOG_CRITICAL << "tablet " << it.first << " , master 
is " << it.second.host;
+                        _tablet_master_info[it.first] = it.second;

Review Comment:
   what's the difference between tablet_master and _tablet_master_info



##########
be/src/olap/olap_server.cpp:
##########
@@ -714,11 +948,14 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
 
 Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
                                              CompactionType compaction_type) {
+

Review Comment:
   delete useless blank lines



##########
be/src/olap/olap_server.cpp:
##########
@@ -532,6 +597,164 @@ void StorageEngine::_compaction_tasks_producer_callback() 
{
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
 }
 
+void StorageEngine::_tablet_replicas_info_update_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start tablet replicas info update process!";
+    
+    int64_t interval = config::tablet_replicas_info_update_interval_seconds;
+    do {
+        if (config::enable_single_replica_compaction) {
+            auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
+                return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
+                    
!t->tablet_meta()->tablet_schema()->disable_auto_compaction();
+            });
+            TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
+            if (master_info == nullptr) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            TNetworkAddress master_addr = master_info->network_address;
+            if (master_addr.hostname == "" || master_addr.port == 0) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            
+            int start = 0;
+            int tablet_size = all_tablets.size();
+            while (start < tablet_size) {
+                int batch_size = std::min(100, tablet_size - start);
+                int end = start + batch_size;
+                TGetTabletReplicasInfoRequest request;
+                TGetTabletReplicasInfoResult result;
+                for (int i = start; i < end; i++) {
+                    
request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
+                }
+                Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+                    master_addr.hostname, master_addr.port,
+                    [&request, &result](FrontendServiceConnection& client) {
+                        client->getTabletReplicasInfo(result, request);
+                    });
+
+                if (!rpc_st.ok()) {
+                    LOG(WARNING)<< "Failed to get tablet replicas info, 
encounter rpc failure, tablet start: "
+                                << start << " end: " << end;
+                    start = end;
+                    continue;
+                }
+
+                std::unordered_map<int64_t, TBackend> tablet_master;
+                for (const auto& it : result.tablet_replicas_info) {
+                    auto tablet_id = it.first;
+                    std::vector<TBackend> backends;
+                    for (const auto& backend : it.second) {
+                        backends.emplace_back(backend);
+                    }
+                    TBackend master;
+                    auto tablet = _tablet_manager->get_tablet(tablet_id);
+                    if (tablet == nullptr) {
+                        VLOG_CRITICAL << "tablet is nullptr";
+                        continue;
+                    }
+                    auto hasMaster = tablet->calc_master_info(backends, 
master);
+                    if (hasMaster) {
+                        tablet_master[tablet_id] = master;
+                    } 
+                }
+                VLOG_CRITICAL << "get tablet replicas info from fe, size is " 
<< end - start 
+                           << " token=" << result.token;
+
+                // update _tablet_replicas_info
+                {
+                    std::unique_lock<std::mutex> 
lock(_tablet_master_info_mutex);
+                    for(const auto& it : tablet_master) {
+                        VLOG_CRITICAL << "tablet " << it.first << " , master 
is " << it.second.host;
+                        _tablet_master_info[it.first] = it.second;
+                    }
+                    _token = result.token;
+                }
+
+                start = end;
+            }
+            interval = config::tablet_replicas_info_update_interval_seconds;
+        } else {
+            interval = 15; // 15s to check enable_single_replica_compaction
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
+}
+
+Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr 
tablet) {
+    
+    bool already_exist = _push_tablet_into_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+    if (already_exist) {
+        return Status::AlreadyExist(
+                "compaction task has already been submitted, tablet_id={}", 
tablet->tablet_id());
+    }
+    
+    already_exist = _push_tablet_into_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
+    if (already_exist) {
+        _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+        return Status::AlreadyExist(
+                "compaction task has already been submitted, tablet_id={}", 
tablet->tablet_id());
+    }
+    
+    VLOG_CRITICAL << "submit single replica compaction!";
+
+    Status st = tablet->prepare_single_replica_compaction(tablet);
+    
+    if(st.ok()) {
+        auto st = _single_replica_compaction_thread_pool->submit_func([tablet, 
this](){

Review Comment:
   do not use same name for different variable



##########
be/src/olap/olap_server.cpp:
##########
@@ -532,6 +597,164 @@ void StorageEngine::_compaction_tasks_producer_callback() 
{
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
 }
 
+void StorageEngine::_tablet_replicas_info_update_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start tablet replicas info update process!";
+    
+    int64_t interval = config::tablet_replicas_info_update_interval_seconds;
+    do {
+        if (config::enable_single_replica_compaction) {
+            auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
+                return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
+                    
!t->tablet_meta()->tablet_schema()->disable_auto_compaction();
+            });
+            TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
+            if (master_info == nullptr) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            TNetworkAddress master_addr = master_info->network_address;
+            if (master_addr.hostname == "" || master_addr.port == 0) {
+                LOG(WARNING)<< "Have not get FE Master heartbeat yet";
+                std::this_thread::sleep_for(std::chrono::seconds(2));
+                continue;
+            }
+            
+            int start = 0;
+            int tablet_size = all_tablets.size();
+            while (start < tablet_size) {
+                int batch_size = std::min(100, tablet_size - start);
+                int end = start + batch_size;
+                TGetTabletReplicasInfoRequest request;
+                TGetTabletReplicasInfoResult result;
+                for (int i = start; i < end; i++) {
+                    
request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
+                }
+                Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+                    master_addr.hostname, master_addr.port,
+                    [&request, &result](FrontendServiceConnection& client) {
+                        client->getTabletReplicasInfo(result, request);
+                    });
+
+                if (!rpc_st.ok()) {
+                    LOG(WARNING)<< "Failed to get tablet replicas info, 
encounter rpc failure, tablet start: "
+                                << start << " end: " << end;
+                    start = end;
+                    continue;
+                }
+
+                std::unordered_map<int64_t, TBackend> tablet_master;
+                for (const auto& it : result.tablet_replicas_info) {
+                    auto tablet_id = it.first;
+                    std::vector<TBackend> backends;
+                    for (const auto& backend : it.second) {
+                        backends.emplace_back(backend);
+                    }
+                    TBackend master;
+                    auto tablet = _tablet_manager->get_tablet(tablet_id);
+                    if (tablet == nullptr) {
+                        VLOG_CRITICAL << "tablet is nullptr";
+                        continue;
+                    }
+                    auto hasMaster = tablet->calc_master_info(backends, 
master);
+                    if (hasMaster) {
+                        tablet_master[tablet_id] = master;
+                    } 
+                }
+                VLOG_CRITICAL << "get tablet replicas info from fe, size is " 
<< end - start 
+                           << " token=" << result.token;
+
+                // update _tablet_replicas_info
+                {
+                    std::unique_lock<std::mutex> 
lock(_tablet_master_info_mutex);
+                    for(const auto& it : tablet_master) {
+                        VLOG_CRITICAL << "tablet " << it.first << " , master 
is " << it.second.host;
+                        _tablet_master_info[it.first] = it.second;
+                    }
+                    _token = result.token;
+                }
+
+                start = end;
+            }
+            interval = config::tablet_replicas_info_update_interval_seconds;
+        } else {
+            interval = 15; // 15s to check enable_single_replica_compaction
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
+}
+
+Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr 
tablet) {
+    
+    bool already_exist = _push_tablet_into_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+    if (already_exist) {
+        return Status::AlreadyExist(
+                "compaction task has already been submitted, tablet_id={}", 
tablet->tablet_id());
+    }
+    
+    already_exist = _push_tablet_into_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
+    if (already_exist) {
+        _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+        return Status::AlreadyExist(
+                "compaction task has already been submitted, tablet_id={}", 
tablet->tablet_id());
+    }
+    
+    VLOG_CRITICAL << "submit single replica compaction!";
+
+    Status st = tablet->prepare_single_replica_compaction(tablet);
+    
+    if(st.ok()) {
+        auto st = _single_replica_compaction_thread_pool->submit_func([tablet, 
this](){
+            tablet->execute_single_replica_compaction();
+            tablet->reset_single_replica_compaction();
+            _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+            _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
+        });
+        if (!st.ok()) {
+            tablet->reset_single_replica_compaction();

Review Comment:
   duplicate code should be avoid



##########
be/src/olap/tablet.cpp:
##########
@@ -1270,6 +1342,71 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_cumulative_compac
     return candidate_rowsets;
 }
 
+std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_single_replica_compaction() {
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    {
+        std::shared_lock rlock(_meta_lock);
+        for (const auto& [version, rs] : _rs_version_map) {
+            if (rs->is_local()) {
+                candidate_rowsets.push_back(rs);
+            }
+        }
+    }
+    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
Rowset::comparator);
+    return candidate_rowsets;
+}
+
+Status Tablet::get_peer_versions(std::vector<Version>& peer_versions) {
+    TBackend addr;
+    std::string token;
+    if(!StorageEngine::instance()->get_tbackend(tablet_id(), addr, token)) {
+        LOG(INFO) << tablet_id() <<  " tablet don't have master peer";
+        return Status::Aborted("no master peer");;
+    }
+    
+    PGetTabletVersionsRequest request;
+    request.set_tablet_id(tablet_id());
+    PGetTabletVersionsResponse response;
+    std::shared_ptr<PBackendService_Stub> stub =
+            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
+                                                                            
addr.brpc_port);
+
+    brpc::Controller cntl;
+    stub->get_tablet_versions(&cntl, &request, &response, nullptr);
+    if (cntl.Failed()) {
+        LOG(WARNING) << "open brpc connection to " << addr.host << " failed: " 
<< cntl.ErrorText();
+        return Status::InternalError("failed to open brpc");
+    }
+
+    if (response.version_status() == PVersionStatus::Version_NONE) {
+        VLOG_DEBUG << "can't get peer versions in peer replica";
+        return Status::InternalError("failed to get peer versions");
+    }
+    
+    for (int i = 0; i < response.versions_size() / 2; ++i) {
+        peer_versions.emplace_back(Version(response.versions(i * 2), 
response.versions(i * 2 + 1)));
+    }
+    return Status::OK();
+}
+
+bool Tablet::calc_master_info(std::vector<TBackend> &backends, TBackend 
&master) const {
+    int64_t cur_replica_id = replica_id();

Review Comment:
   what's the sematics of current replica? And when is it set or update?



##########
be/src/olap/olap_server.cpp:
##########
@@ -519,6 +563,27 @@ void StorageEngine::_compaction_tasks_producer_callback() {
             /// If it is not cleaned up, the reference count of the tablet 
will always be greater than 1,
             /// thus cannot be collected by the garbage collector. 
(TabletManager::start_trash_sweep)
             for (const auto& tablet : tablets_compaction) {
+                if (config::enable_single_replica_compaction) {

Review Comment:
   put the duplicate logic in _submit_compaction_task



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to