This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit eadd33a6094aead5de4d95010a8f654d0fe18018 Author: chenlinzhong <[email protected]> AuthorDate: Mon Jan 30 16:53:21 2023 +0800 [fix](vresultsink) BufferControlBlock may block all fragment handle threads (#16231) BufferControlBlock may block all fragment handle threads leads to be out of work modify include: BufferControlBlock cancel after max timeout StmtExcutor notify be to cancel the fragment when unexcepted occur more details see issue #16203 --- be/src/runtime/result_buffer_mgr.cpp | 10 +++++++++- be/src/runtime/result_buffer_mgr.h | 3 ++- be/src/runtime/result_file_sink.cpp | 2 +- be/src/runtime/result_sink.cpp | 2 +- be/src/runtime/runtime_state.h | 1 + be/src/vec/sink/vresult_file_sink.cpp | 3 ++- be/src/vec/sink/vresult_sink.cpp | 5 +++-- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 6 +++++- fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java | 6 ++++++ 9 files changed, 30 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index ce589dd745..b9d6cc7234 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -58,7 +58,8 @@ Status ResultBufferMgr::init() { } Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr<BufferControlBlock>* sender) { + std::shared_ptr<BufferControlBlock>* sender, + int query_timeout) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -70,6 +71,13 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size { std::lock_guard<std::mutex> l(_lock); _buffer_map.insert(std::make_pair(query_id, control_block)); + // BufferControlBlock should destroy after max_timeout + // for exceed max_timeout FE will return timeout to client + // otherwise in some case may block all fragment handle threads + // details see issue https://github.com/apache/doris/issues/16203 + // add extra 5s for avoid corner case + int64_t max_timeout = time(nullptr) + query_timeout + 5; + cancel_at_time(max_timeout, query_id); } *sender = control_block; return Status::OK(); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 26a07bd90c..cc16e771a0 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -47,7 +47,8 @@ public: // the returned sender do not need release // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr<BufferControlBlock>* sender); + std::shared_ptr<BufferControlBlock>* sender, + int query_timeout); // fetch data, used by RPC Status fetch_data(const TUniqueId& fragment_id, TFetchDataResult* result); diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp index cd3e61659a..e294def28f 100644 --- a/be/src/runtime/result_file_sink.cpp +++ b/be/src/runtime/result_file_sink.cpp @@ -100,7 +100,7 @@ Status ResultFileSink::prepare(RuntimeState* state) { if (_is_top_sink) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), _buf_size, &_sender)); + state->fragment_instance_id(), _buf_size, &_sender, state->query_timeout())); // create writer _writer.reset(new (std::nothrow) FileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs, diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index de393e9e11..a19c44ea7c 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -67,7 +67,7 @@ Status ResultSink::prepare(RuntimeState* state) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(), - _buf_size, &_sender)); + _buf_size, &_sender, state->query_timeout())); // create writer based on sink type switch (_sink_type) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ca6dc42c4e..cdbfeff5fa 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -97,6 +97,7 @@ public: return _query_options.abort_on_default_limit_exceeded; } int max_errors() const { return _query_options.max_errors; } + int query_timeout() const { return _query_options.query_timeout; } int max_io_buffers() const { return _query_options.max_io_buffers; } int num_scanner_threads() const { return _query_options.num_scanner_threads; } TQueryType::type query_type() const { return _query_options.query_type; } diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 7bfc8b4c8a..948dfce4d0 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -103,7 +103,8 @@ Status VResultFileSink::prepare(RuntimeState* state) { if (_is_top_sink) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), _buf_size, &_sender)); + state->fragment_instance_id(), _buf_size, &_sender, + state->query_timeout())); // create writer _writer.reset(new (std::nothrow) VFileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 77e64a8959..5d3fe0ec9a 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -61,8 +61,9 @@ Status VResultSink::prepare(RuntimeState* state) { RETURN_IF_ERROR(prepare_exprs(state)); // create sender - RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(), - _buf_size, &_sender)); + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( + state->fragment_instance_id(), _buf_size, &_sender, + state->query_timeout())); // create writer based on sink type switch (_sink_type) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7d32530eb6..40b085505d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -940,6 +940,10 @@ public class Coordinator { // fragment, // if any, as well as all plan fragments on remote nodes. public void cancel() { + cancel(Types.PPlanFragmentCancelReason.USER_CANCEL); + } + + public void cancel(Types.PPlanFragmentCancelReason cancelReason) { lock(); try { if (!queryStatus.ok()) { @@ -949,7 +953,7 @@ public class Coordinator { queryStatus.setStatus(Status.CANCELLED); } LOG.warn("cancel execution of query, this is outside invoke"); - cancelInternal(Types.PPlanFragmentCancelReason.USER_CANCEL); + cancelInternal(cancelReason); } finally { unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 30c5e1dbe4..55e6832794 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -100,6 +100,7 @@ import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; @@ -1205,6 +1206,11 @@ public class StmtExecutor implements ProfileWriter { context.getState().setEof(); plannerProfile.setQueryFetchResultFinishTime(); } catch (Exception e) { + // notify all be cancel runing fragment + // in some case may block all fragment handle threads + // details see issue https://github.com/apache/doris/issues/16203 + LOG.warn("cancel fragment query_id:{} cause {}", DebugUtil.printId(context.queryId()), e.getMessage()); + coord.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); fetchResultSpan.recordException(e); throw e; } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
