xiaokang commented on code in PR #31743:
URL: https://github.com/apache/doris/pull/31743#discussion_r1511970268


##########
be/src/olap/rowset/beta_rowset.cpp:
##########
@@ -470,6 +470,27 @@ Status BetaRowset::add_to_binlog() {
         }
     }
 
+    // link inverted index files
+    for (const auto& column : tablet_schema()->columns()) {
+        if (tablet_schema()->has_inverted_index(column)) {
+            const auto* index_info = 
tablet_schema()->get_inverted_index(column);
+            auto index_id = index_info->index_id();
+            for (int i = 0; i < segments_num; ++i) {
+                auto index_file = 
InvertedIndexDescriptor::inverted_index_file_path(
+                        _rowset_dir, rowset_id(), i, index_id, 
index_info->get_index_suffix());
+                auto binlog_index_file = (std::filesystem::path(binlog_dir) /
+                                          
std::filesystem::path(index_file).filename())
+                                                 .string();
+                VLOG_DEBUG << "link " << index_file << " to " << 
binlog_index_file;
+                if (!local_fs->link_file(index_file, binlog_index_file).ok()) {

Review Comment:
   add linked file gc if some failed just like link_files_to()



##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -635,6 +636,44 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
             if (!res.ok()) {
                 break;
             }
+
+            // link inverted index files
+            RowsetMetaPB rowset_meta_pb;
+            if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+                auto err_msg = fmt::format("fail to parse binlog meta data 
value:{}",
+                                           rowset_binlog_meta.data());
+                res = Status::InternalError(err_msg);
+                LOG(WARNING) << err_msg;
+                return res;
+            }
+
+            const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+            TabletSchema tablet_schema;
+            tablet_schema.init_from_pb(tablet_schema_pb);
+
+            for (const auto& column : tablet_schema.columns()) {
+                if (tablet_schema.has_inverted_index(column)) {
+                    const auto* index_info = 
tablet_schema.get_inverted_index(column);
+                    auto index_id = index_info->index_id();
+                    for (int i = 0; i < num_segments; ++i) {
+                        auto index_file =
+                                
ref_tablet->get_segment_index_filepath(rowset_id, i, index_id);
+                        auto snapshot_segment_index_file_path =
+                                fmt::format("{}/{}_{}_{}.binlog-index", 
schema_full_path, rowset_id,
+                                            i, index_id);
+
+                        VLOG_DEBUG << "link " << index_file << " to "
+                                   << snapshot_segment_index_file_path;
+                        res = io::global_local_filesystem()->link_file(

Review Comment:
   why are there two link file operations for CCR?



##########
be/src/olap/tablet.cpp:
##########
@@ -2657,6 +2657,19 @@ std::string 
Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
     return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, 
segment_index);
 }
 
+std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
+                                               std::string_view segment_index,
+                                               std::string_view index_index) 
const {
+    return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, 
segment_index,

Review Comment:
   do not forget to change it after index file format v2 refactor



##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -635,6 +636,44 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
             if (!res.ok()) {
                 break;
             }
+
+            // link inverted index files
+            RowsetMetaPB rowset_meta_pb;
+            if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+                auto err_msg = fmt::format("fail to parse binlog meta data 
value:{}",
+                                           rowset_binlog_meta.data());
+                res = Status::InternalError(err_msg);
+                LOG(WARNING) << err_msg;
+                return res;
+            }
+
+            const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+            TabletSchema tablet_schema;
+            tablet_schema.init_from_pb(tablet_schema_pb);
+
+            for (const auto& column : tablet_schema.columns()) {
+                if (tablet_schema.has_inverted_index(column)) {
+                    const auto* index_info = 
tablet_schema.get_inverted_index(column);
+                    auto index_id = index_info->index_id();
+                    for (int i = 0; i < num_segments; ++i) {
+                        auto index_file =
+                                
ref_tablet->get_segment_index_filepath(rowset_id, i, index_id);
+                        auto snapshot_segment_index_file_path =
+                                fmt::format("{}/{}_{}_{}.binlog-index", 
schema_full_path, rowset_id,
+                                            i, index_id);
+
+                        VLOG_DEBUG << "link " << index_file << " to "
+                                   << snapshot_segment_index_file_path;
+                        res = io::global_local_filesystem()->link_file(

Review Comment:
   linked files gc on failure



##########
be/src/olap/tablet.cpp:
##########
@@ -2657,6 +2657,19 @@ std::string 
Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
     return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, 
segment_index);
 }
 
+std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
+                                               std::string_view segment_index,
+                                               std::string_view index_index) 
const {

Review Comment:
   Do you mean index_id by index_index ?



##########
be/src/service/backend_service.cpp:
##########
@@ -286,8 +286,121 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
         }
     }
 
-    // Step 6: create rowset && calculate delete bitmap && commit
-    // Step 6.1: create rowset
+    // Step 6: get all segment index files
+    // Step 6.1: get all segment index files size
+    std::vector<std::string> segment_index_file_urls;
+    std::vector<uint64_t> segment_index_file_sizes;
+    std::vector<std::string> segment_index_file_names;
+    auto tablet_schema = rowset_meta->tablet_schema();
+    for (const auto& column : tablet_schema->columns()) {
+        if (tablet_schema->has_inverted_index(column)) {
+            const auto* index_info = tablet_schema->get_inverted_index(column);
+            auto index_id = index_info->index_id();
+            for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+                auto get_segment_index_file_size_url = fmt::format(
+                        
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_"
+                        "index={}",
+                        binlog_api_url, "get_segment_index_file", 
request.remote_tablet_id,
+                        remote_rowset_id, segment_index, index_id);
+                uint64_t segment_index_file_size;
+                auto get_segment_index_file_size_cb =
+                        [&get_segment_index_file_size_url,
+                         &segment_index_file_size](HttpClient* client) {
+                            
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+                            client->set_timeout_ms(kMaxTimeoutMs);
+                            RETURN_IF_ERROR(client->head());
+                            return 
client->get_content_length(&segment_index_file_size);
+                        };
+                auto index_file = 
InvertedIndexDescriptor::inverted_index_file_path(
+                        local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index,
+                        index_id, index_info->get_index_suffix());
+                segment_index_file_names.push_back(index_file);
+
+                status = HttpClient::execute_with_retry(max_retry, 1,
+                                                        
get_segment_index_file_size_cb);
+                if (!status.ok()) {
+                    LOG(WARNING) << "failed to get segment file size from "
+                                 << get_segment_index_file_size_url
+                                 << ", status=" << status.to_string();
+                    status.to_thrift(&tstatus);
+                    return;
+                }
+
+                segment_index_file_sizes.push_back(segment_index_file_size);
+                
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+            }
+        }
+    }
+
+    // Step 6.2: check data capacity
+    uint64_t total_index_size =
+            std::accumulate(segment_index_file_sizes.begin(), 
segment_index_file_sizes.end(),
+                            0); // NOLINT(bugprone-fold-init-type)
+    if (!local_tablet->can_add_binlog(total_index_size)) {
+        LOG(WARNING) << "failed to add binlog, no enough space, 
total_index_size="
+                     << total_index_size << ", tablet=" << 
local_tablet->tablet_id();
+        status = Status::InternalError("no enough space");
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    // Step 6.3: get all segment index files
+    LOG(INFO) << "segment_index_file_urls.size=" << 
segment_index_file_urls.size() << ", "
+              << "segment_index_file_names.size=" << 
segment_index_file_names.size() << ", "
+              << "segment_index_file_sizes.size=" << 
segment_index_file_sizes.size();
+    DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size());
+    DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
+    for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
+        auto segment_index_file_size = segment_index_file_sizes[i];
+        auto get_segment_index_file_url = segment_index_file_urls[i];
+
+        uint64_t estimate_timeout =
+                segment_index_file_size / 
config::download_low_speed_limit_kbps / 1024;
+        if (estimate_timeout < config::download_low_speed_time) {
+            estimate_timeout = config::download_low_speed_time;
+        }
+
+        auto local_segment_index_path = segment_index_file_names[i];
+        LOG(INFO) << fmt::format("download segment index file from {} to {}",
+                                 get_segment_index_file_url, 
local_segment_index_path);
+        auto get_segment_index_file_cb = [&get_segment_index_file_url, 
&local_segment_index_path,
+                                          segment_index_file_size,
+                                          estimate_timeout](HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(get_segment_index_file_url));
+            client->set_timeout_ms(estimate_timeout * 1000);
+            RETURN_IF_ERROR(client->download(local_segment_index_path));
+
+            std::error_code ec;
+            // Check file length
+            uint64_t local_index_file_size =
+                    std::filesystem::file_size(local_segment_index_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download index file error" << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to 
{}",
+                                       local_segment_index_path, ec.message());
+            }
+            if (local_index_file_size != segment_index_file_size) {
+                LOG(WARNING) << "download index file length error"
+                             << ", get_segment_index_file_url=" << 
get_segment_index_file_url
+                             << ", index_file_size=" << segment_index_file_size
+                             << ", local_index_file_size=" << 
local_index_file_size;
+                return Status::InternalError("downloaded index file size is 
not equal");
+            }
+            return 
io::global_local_filesystem()->permission(local_segment_index_path,
+                                                             
io::LocalFileSystem::PERMS_OWNER_RW);
+        };
+
+        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_index_file_cb);

Review Comment:
   Is there gc logic for success files on failure?



##########
be/src/olap/rowset/beta_rowset.cpp:
##########
@@ -470,6 +470,27 @@ Status BetaRowset::add_to_binlog() {
         }
     }
 
+    // link inverted index files
+    for (const auto& column : tablet_schema()->columns()) {
+        if (tablet_schema()->has_inverted_index(column)) {
+            const auto* index_info = 
tablet_schema()->get_inverted_index(column);

Review Comment:
   consistent name index_meta



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to