yiguolei commented on code in PR #64866:
URL: https://github.com/apache/doris/pull/64866#discussion_r3480760912


##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -91,57 +91,53 @@ Status 
RuntimeFilterMgr::register_local_merger_producer_filter(
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
-    LocalMergeContext* context;
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        context = &_local_merge_map[key]; // may inplace construct default 
object
+    std::lock_guard<std::mutex> l(_lock);
+    auto& context = _local_merge_map[key];
+    if (!context || producer->stage() > context->stage) {
+        auto new_context = std::make_shared<LocalMergeContext>();
+        RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, 
&new_context->merger));
+        new_context->stage = producer->stage();
+        context = new_context;
     }
 
-    RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer));
-    return Status::OK();
-}
-
-Status LocalMergeContext::register_producer(const QueryContext* query_ctx,
-                                            const TRuntimeFilterDesc* desc,
-                                            
std::shared_ptr<RuntimeFilterProducer> producer) {
-    std::lock_guard<std::mutex> l(mtx);
-    if (producer->stage() > stage) {
-        // New recursive CTE round: discard stale merger and producers from
-        // the previous round and recreate the merger for the new round.
-        merger.reset();
-        producers.clear();
-        stage = producer->stage();
-    }
-    if (!merger) {
-        RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger));
-    }
-    producers.emplace_back(producer);
-    merger->set_expected_producer_num(cast_set<int>(producers.size()));
+    context->producers.emplace_back(producer);
+    
context->merger->set_expected_producer_num(cast_set<int>(context->producers.size()));
     // Sync the local merger's stage from the producer so that outgoing merge 
RPCs
     // (via _push_to_remote) carry the correct recursive CTE round number.
-    merger->set_stage(producer->stage());
+    context->merger->set_stage(producer->stage());
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
-                                                          LocalMergeContext** 
local_merge_filters) {
+std::string LocalMergeContext::debug_string() {
+    std::string result =

Review Comment:
   这里需要加lock,否则这个context->producers.emplace_back(producer); 方法,就会直接挂掉



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to