dataroaring commented on code in PR #18874:
URL: https://github.com/apache/doris/pull/18874#discussion_r1184751344


##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1062,6 +1135,41 @@ Status VOlapTableSink::open(RuntimeState* state) {
     return Status::OK();
 }
 
+void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) {
+    const auto& id = partition->id;
+    auto it = _partition_opened.find(id);
+    if (it == _partition_opened.end()) {
+        {
+            std::unique_lock<std::mutex> l(_partition_opened_mutex);
+            auto it = _partition_opened.find(id);
+            if (it != _partition_opened.end()) {
+                return;
+            }
+            _partition_opened.insert(std::pair(id, false));
+        }
+        for (int j = 0; j < partition->indexes.size(); ++j) {
+            for (const auto& tid : partition->indexes[j].tablets) {
+                auto it = _channels[j]->_channels_by_tablet.find(tid);
+                for (const auto& channel : it->second) {
+                    auto open_partition_closure = 
channel->open_partition(partition->id);
+                    auto st = 
channel->open_partition_wait(open_partition_closure);
+                    if (!st.ok()) {
+                        _channels[j]->mark_as_failed(
+                                channel->node_id(), channel->host(),
+                                fmt::format("{}, open failed, err: {}", 
channel->channel_info(),
+                                            st.to_string()),
+                                -1);
+                    }
+                }
+            }
+        }
+        {
+            std::unique_lock<std::mutex> l(_partition_opened_mutex);

Review Comment:
   sink is called in only one thread.



##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1062,6 +1135,41 @@ Status VOlapTableSink::open(RuntimeState* state) {
     return Status::OK();
 }
 
+void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) {
+    const auto& id = partition->id;
+    auto it = _partition_opened.find(id);
+    if (it == _partition_opened.end()) {
+        {
+            std::unique_lock<std::mutex> l(_partition_opened_mutex);
+            auto it = _partition_opened.find(id);
+            if (it != _partition_opened.end()) {
+                return;
+            }
+            _partition_opened.insert(std::pair(id, false));
+        }
+        for (int j = 0; j < partition->indexes.size(); ++j) {
+            for (const auto& tid : partition->indexes[j].tablets) {
+                auto it = _channels[j]->_channels_by_tablet.find(tid);
+                for (const auto& channel : it->second) {
+                    auto open_partition_closure = 
channel->open_partition(partition->id);
+                    auto st = 
channel->open_partition_wait(open_partition_closure);
+                    if (!st.ok()) {
+                        _channels[j]->mark_as_failed(
+                                channel->node_id(), channel->host(),
+                                fmt::format("{}, open failed, err: {}", 
channel->channel_info(),
+                                            st.to_string()),
+                                -1);

Review Comment:
   We should wait all together to reduce time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to