This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 633f2d52a4 [minor](log) add some logs (#17287)
633f2d52a4 is described below
commit 633f2d52a41a60c6d2c706a0684fa3bb3a4e8bec
Author: Gabriel <[email protected]>
AuthorDate: Wed Mar 1 22:41:50 2023 +0800
[minor](log) add some logs (#17287)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 124 +++++++++++++++++---------
be/src/vec/sink/vtablet_sink.cpp | 10 ++-
2 files changed, 87 insertions(+), 47 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index aa3948e0a1..da536d66ec 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -197,53 +197,91 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
return Status::OK();
}
-#define DO_RPC(QUEUE, BLOCK, HOLDER)
\
- auto& request = QUEUE.front();
\
- if (!_instance_to_request[id]) {
\
- _construct_request(id);
\
- }
\
- auto brpc_request = _instance_to_request[id];
\
- brpc_request->set_eos(request.eos);
\
- brpc_request->set_packet_seq(_instance_to_seq[id]++);
\
- if (request.BLOCK) {
\
- brpc_request->set_allocated_block(request.BLOCK);
\
- }
\
- auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id,
request.eos, HOLDER); \
- _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
\
- _closure->addFailedHandler(
\
- [&](const InstanceLoId& id, const std::string& err) { _failed(id,
err); }); \
- _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
\
- const PTransmitDataResult& result) {
\
- Status s = Status(result.status());
\
- if (!s.ok()) {
\
- _failed(id,
\
- fmt::format("exchange req success but status isn't ok:
{}", s.to_string())); \
- } else if (eos) {
\
- _ended(id);
\
- } else {
\
- _send_rpc(id);
\
- }
\
- });
\
- {
\
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
\
- if (enable_http_send_block(*brpc_request)) {
\
- RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(),
_closure, \
- *brpc_request,
request.channel->_brpc_dest_addr)); \
- } else {
\
- transmit_block(*request.channel->_brpc_stub, _closure,
*brpc_request); \
- }
\
- }
\
- if (request.BLOCK) {
\
- brpc_request->release_block();
\
- }
\
- QUEUE.pop();
-
if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
- DO_RPC(q, block.get(), nullptr)
+ auto& request = q.front();
+ if (!_instance_to_request[id]) {
+ _construct_request(id);
+ }
+ auto brpc_request = _instance_to_request[id];
+ brpc_request->set_eos(request.eos);
+ brpc_request->set_packet_seq(_instance_to_seq[id]++);
+ if (request.block) {
+ brpc_request->set_allocated_block(request.block.get());
+ }
+ auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id,
request.eos, nullptr);
+ _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+ _closure->addFailedHandler(
+ [&](const InstanceLoId& id, const std::string& err) {
_failed(id, err); });
+ _closure->addSuccessHandler([&](const InstanceLoId& id, const bool&
eos,
+ const PTransmitDataResult& result) {
+ Status s = Status(result.status());
+ if (!s.ok()) {
+ _failed(id,
+ fmt::format("exchange req success but status isn't ok:
{}", s.to_string()));
+ } else if (eos) {
+ _ended(id);
+ } else {
+ _send_rpc(id);
+ }
+ });
+ {
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+ if (enable_http_send_block(*brpc_request)) {
+
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure,
+ *brpc_request,
+
request.channel->_brpc_dest_addr));
+ } else {
+ transmit_block(*request.channel->_brpc_stub, _closure,
*brpc_request);
+ }
+ }
+ if (request.block) {
+ brpc_request->release_block();
+ }
+ q.pop();
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
- DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder)
+ auto& request = broadcast_q.front();
+ if (!_instance_to_request[id]) {
+ _construct_request(id);
+ }
+ auto brpc_request = _instance_to_request[id];
+ brpc_request->set_eos(request.eos);
+ brpc_request->set_packet_seq(_instance_to_seq[id]++);
+ if (request.block_holder->get_block()) {
+
brpc_request->set_allocated_block(request.block_holder->get_block());
+ }
+ auto* _closure =
+ new SelfDeleteClosure<PTransmitDataResult>(id, request.eos,
request.block_holder);
+ _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+ _closure->addFailedHandler(
+ [&](const InstanceLoId& id, const std::string& err) {
_failed(id, err); });
+ _closure->addSuccessHandler([&](const InstanceLoId& id, const bool&
eos,
+ const PTransmitDataResult& result) {
+ Status s = Status(result.status());
+ if (!s.ok()) {
+ _failed(id,
+ fmt::format("exchange req success but status isn't ok:
{}", s.to_string()));
+ } else if (eos) {
+ _ended(id);
+ } else {
+ _send_rpc(id);
+ }
+ });
+ {
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+ if (enable_http_send_block(*brpc_request)) {
+
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure,
+ *brpc_request,
+
request.channel->_brpc_dest_addr));
+ } else {
+ transmit_block(*request.channel->_brpc_stub, _closure,
*brpc_request);
+ }
+ }
+ if (request.block_holder->get_block()) {
+ brpc_request->release_block();
+ }
+ broadcast_q.pop();
} else {
_instance_to_sending_by_pipeline[id] = true;
return Status::OK();
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index bd35929f85..ac58b4f3e7 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1384,13 +1384,15 @@ Status VOlapTableSink::_validate_column(RuntimeState*
state, const TypeDescripto
invalid = true;
}
}
-
- if (dec_val > _get_decimalv2_min_or_max<false>(type) ||
- dec_val < _get_decimalv2_min_or_max<true>(type)) {
+ const auto& max_decimalv2 =
_get_decimalv2_min_or_max<false>(type);
+ const auto& min_decimalv2 =
_get_decimalv2_min_or_max<true>(type);
+ if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
fmt::format_to(error_msg, "{}", "decimal value is not
valid for definition");
fmt::format_to(error_msg, ", value={}",
dec_val.to_string());
- fmt::format_to(error_msg, ", precision={}, scale={}; ",
type.precision,
+ fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision,
type.scale);
+ fmt::format_to(error_msg, ", min={}, max={}; ",
min_decimalv2.to_string(),
+ max_decimalv2.to_string());
invalid = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]