freemandealer commented on code in PR #18874:
URL: https://github.com/apache/doris/pull/18874#discussion_r1174514457


##########
be/src/common/config.h:
##########
@@ -414,8 +414,12 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 // the timeout of a rpc to open the tablet writer in remote BE.
 // short operation time, can set a short timeout
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
+// the timeout of a rpc to open the partition in remote BE.
+// short operation time, can set a short timeout
+CONF_Int32(partition_open_rpc_timeout_sec, "60");

Review Comment:
   please add a flag in config.h to enable/disable this feature for 
compatibility reasons when the user is doing upgrade.



##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1014,6 +1043,31 @@ Status VOlapTableSink::open(RuntimeState* state) {
     return Status::OK();
 }
 
+void VOlapTableSink::_open_partition(const VOlapTablePartition* partition, 
uint32_t tablet_index) {

Review Comment:
   we don't need `tablet_index` passed to this function.
   Tablets shoud be deduced using `partition` only.



##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -424,6 +425,34 @@ Status VNodeChannel::open_wait() {
     return status;
 }
 
+void VNodeChannel::open_partition(int64_t partition_id) {
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
+    PartitionOpenRequest request;
+    request.set_allocated_id(&_parent->_load_id);
+    request.set_index_id(_index_channel->_index_id);
+    for (auto& tablet : _all_tablets) {
+        if (partition_id == tablet.partition_id) {
+            auto ptablet = request.add_tablets();
+            ptablet->set_partition_id(tablet.partition_id);
+            ptablet->set_tablet_id(tablet.tablet_id);
+        }
+    }
+
+    RefCountClosure<PartitionOpenResult>* partition_open_closure =
+            new RefCountClosure<PartitionOpenResult>();
+    partition_open_closure->ref();
+
+    // This ref is for RPC's reference
+    partition_open_closure->ref();
+    
partition_open_closure->cntl.set_timeout_ms(config::partition_open_rpc_timeout_sec
 * 1000);
+    if (config::partition_open_ignore_eovercrowded) {
+        partition_open_closure->cntl.ignore_eovercrowded();
+    }
+    // Lazy open delter writer
+    _stub->partition_open(&partition_open_closure->cntl, &request, 
&partition_open_closure->result,
+                          partition_open_closure);

Review Comment:
   we should check RPC status and retry when failure. If still failed after 
retry 10 times, cancel the loading job.



##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -424,6 +425,34 @@ Status VNodeChannel::open_wait() {
     return status;
 }
 
+void VNodeChannel::open_partition(int64_t partition_id) {
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
+    PartitionOpenRequest request;
+    request.set_allocated_id(&_parent->_load_id);
+    request.set_index_id(_index_channel->_index_id);
+    for (auto& tablet : _all_tablets) {
+        if (partition_id == tablet.partition_id) {
+            auto ptablet = request.add_tablets();
+            ptablet->set_partition_id(tablet.partition_id);
+            ptablet->set_tablet_id(tablet.tablet_id);
+        }
+    }
+
+    RefCountClosure<PartitionOpenResult>* partition_open_closure =
+            new RefCountClosure<PartitionOpenResult>();
+    partition_open_closure->ref();
+
+    // This ref is for RPC's reference
+    partition_open_closure->ref();
+    
partition_open_closure->cntl.set_timeout_ms(config::partition_open_rpc_timeout_sec
 * 1000);
+    if (config::partition_open_ignore_eovercrowded) {
+        partition_open_closure->cntl.ignore_eovercrowded();
+    }
+    // Lazy open delter writer

Review Comment:
   remove this comment because we don't want to expose Delta Writer details to 
Sink



##########
be/src/runtime/load_channel.cpp:
##########
@@ -70,6 +70,32 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
     return Status::OK();
 }
 
+Status LoadChannel::open_partition(const PartitionOpenRequest& params) {
+    int64_t index_id = params.index_id();
+    std::shared_ptr<TabletsChannel> channel;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _tablets_channels.find(index_id);
+        if (it != _tablets_channels.end()) {
+            channel = it->second;
+        } else {
+            // create a new tablets channel
+            TabletsChannelKey key(params.id(), index_id);
+            channel.reset(new TabletsChannel(key, _load_id, 
_is_high_priority));
+            {
+                std::lock_guard<SpinLock> l(_tablets_channels_lock);
+                _tablets_channels.insert({index_id, channel});
+            }
+        }
+    }
+
+    RETURN_IF_ERROR(channel->open_all_writers(params));
+
+    _opened = true;
+    _last_updated_time.store(time(nullptr));

Review Comment:
   I don't think it necessary to update _last_updated_time here



##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1183,6 +1237,8 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
         }
         // each row
         _generate_row_distribution_payload(channel_to_payload, partition, 
tablet_index, i, 1);
+        // open partition
+        _open_partition(partition, tablet_index);

Review Comment:
   why we need teblet_index to open partion?



##########
be/src/runtime/tablets_channel.h:
##########
@@ -65,6 +65,9 @@ class TabletsChannel {
 
     Status open(const PTabletWriterOpenRequest& request);
 
+    // open all writer
+    Status open_all_writers(const PartitionOpenRequest& request);

Review Comment:
   misleading naming, maybe rename to `open_all_writers_for_partition`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to