This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ef9559dbe63 [Chore](compile) add some compile_check_begin (#42333)
ef9559dbe63 is described below
commit ef9559dbe63cb1c3ae7f5a6b579125b3b5a6bdaf
Author: Pxl <[email protected]>
AuthorDate: Mon Oct 28 11:15:13 2024 +0800
[Chore](compile) add some compile_check_begin (#42333)
## Proposed changes
add some compile_check_begin
---
be/src/pipeline/dependency.cpp | 6 ++---
be/src/pipeline/dependency.h | 10 ++++----
be/src/pipeline/exec/exchange_sink_operator.cpp | 6 ++---
be/src/pipeline/exec/exchange_sink_operator.h | 8 +++---
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 19 +++++++-------
.../pipeline/exec/join/process_hash_table_probe.h | 10 ++++----
.../exec/join/process_hash_table_probe_impl.h | 30 ++++++++++++----------
be/src/pipeline/exec/set_sink_operator.cpp | 9 ++++---
be/src/pipeline/exec/set_sink_operator.h | 4 ++-
be/src/pipeline/exec/set_source_operator.cpp | 4 +--
be/src/pipeline/exec/set_source_operator.h | 6 ++---
be/src/pipeline/local_exchange/local_exchanger.cpp | 19 +++++++-------
be/src/pipeline/local_exchange/local_exchanger.h | 3 ++-
be/src/pipeline/pipeline.h | 10 +++++---
be/src/pipeline/pipeline_fragment_context.cpp | 24 +++++++++++------
be/src/pipeline/task_queue.cpp | 4 +--
be/src/pipeline/task_queue.h | 9 ++++---
be/src/pipeline/task_scheduler.cpp | 6 ++---
be/src/pipeline/task_scheduler.h | 2 +-
.../vec/common/hash_table/hash_table_set_build.h | 4 +--
be/src/vec/common/hash_table/join_hash_table.h | 14 +++++-----
be/src/vec/sink/vdata_stream_sender.cpp | 6 ++---
be/src/vec/sink/vdata_stream_sender.h | 6 ++---
23 files changed, 120 insertions(+), 99 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 1d450d164a1..8d82c340e2d 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -32,7 +32,7 @@
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
Dependency* BasicSharedState::create_source_dependency(int operator_id, int
node_id,
std::string name) {
source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id,
name + "_DEPENDENCY"));
@@ -267,8 +267,8 @@ bool AggSharedState::do_limit_filter(vectorized::Block*
block, size_t num_rows,
need_computes.data());
}
- auto set_computes_arr = [](auto* __restrict res, auto* __restrict
computes, int rows) {
- for (int i = 0; i < rows; ++i) {
+ auto set_computes_arr = [](auto* __restrict res, auto* __restrict
computes, size_t rows) {
+ for (size_t i = 0; i < rows; ++i) {
computes[i] = computes[i] == res[i];
}
};
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 8060ee8362d..a035d57a837 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -46,7 +46,7 @@ class VSlotRef;
} // namespace doris::vectorized
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
class Dependency;
class PipelineTask;
struct BasicSharedState;
@@ -504,7 +504,7 @@ struct SpillSortSharedState : public BasicSharedState,
~SpillSortSharedState() override = default;
// This number specifies the maximum size of sub blocks
- static constexpr int SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+ static constexpr size_t SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
void update_spill_block_batch_row_count(const vectorized::Block* block) {
auto rows = block->rows();
if (rows > 0 && 0 == avg_row_bytes) {
@@ -525,7 +525,7 @@ struct SpillSortSharedState : public BasicSharedState,
std::deque<vectorized::SpillStreamSPtr> sorted_streams;
size_t avg_row_bytes = 0;
- int spill_block_batch_row_count;
+ size_t spill_block_batch_row_count;
};
struct UnionSharedState : public BasicSharedState {
@@ -677,7 +677,7 @@ public:
std::vector<vectorized::VExprContextSPtrs> child_exprs_lists;
/// init in build side
- int child_quantity;
+ size_t child_quantity;
vectorized::VExprContextSPtrs build_child_exprs;
std::vector<Dependency*> probe_finished_children_dependency;
@@ -867,5 +867,5 @@ private:
std::vector<std::atomic_int64_t> _queues_mem_usage;
const int64_t _each_queue_limit;
};
-
+#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 3f12b4458cd..a3b6f8da7e9 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -38,7 +38,7 @@
#include "vec/exprs/vexpr.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
Status ExchangeSinkLocalState::serialize_block(vectorized::Block* src, PBlock*
dest,
int num_receivers) {
return _parent->cast<ExchangeSinkOperatorX>().serialize_block(*this, src,
dest, num_receivers);
@@ -661,7 +661,7 @@ void
ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buf
Status ExchangeSinkOperatorX::channel_add_rows(
RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
- int num_channels, const uint32_t* __restrict channel_ids, int rows,
+ size_t num_channels, const uint32_t* __restrict channel_ids, size_t
rows,
vectorized::Block* block, bool eos) {
std::vector<std::vector<uint32_t>> channel2rows;
channel2rows.resize(num_channels);
@@ -676,7 +676,7 @@ Status ExchangeSinkOperatorX::channel_add_rows(
Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
- int num_channels, std::vector<std::vector<uint32_t>>& channel2rows,
+ size_t num_channels, std::vector<std::vector<uint32_t>>& channel2rows,
vectorized::Block* block, bool eos) {
Status status = Status::OK();
for (int i = 0; i < num_channels; ++i) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 6b936d4b12c..141693eb820 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -178,7 +178,7 @@ private:
*/
std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
- int _partition_count;
+ size_t _partition_count;
std::shared_ptr<Dependency> _finish_dependency;
@@ -234,12 +234,12 @@ private:
Status channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>&
channels,
- int num_channels, const uint32_t* channel_ids, int
rows,
- vectorized::Block* block, bool eos);
+ size_t num_channels, const uint32_t* __restrict
channel_ids,
+ size_t rows, vectorized::Block* block, bool eos);
Status channel_add_rows_with_idx(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
- int num_channels,
+ size_t num_channels,
std::vector<std::vector<uint32_t>>&
channel2rows,
vectorized::Block* block, bool eos);
RuntimeState* _state = nullptr;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 8ee041f5759..bb869ee3257 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -19,6 +19,7 @@
#include <string>
+#include "common/cast_set.h"
#include "common/logging.h"
#include "pipeline/exec/operator.h"
#include "runtime/descriptors.h"
@@ -295,15 +296,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
if constexpr (!std::is_same_v<HashTableProbeType,
std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- st = process_hashtable_ctx
- .template
process<need_null_map_for_probe, ignore_null>(
- arg,
- need_null_map_for_probe
- ?
&local_state._null_map_column->get_data()
- : nullptr,
- mutable_join_block,
&temp_block,
-
local_state._probe_block.rows(), _is_mark_join,
- _have_other_join_conjunct);
+ st = process_hashtable_ctx.template
process<need_null_map_for_probe,
+
ignore_null>(
+ arg,
+ need_null_map_for_probe
+ ?
&local_state._null_map_column->get_data()
+ : nullptr,
+ mutable_join_block, &temp_block,
+
cast_set<uint32_t>(local_state._probe_block.rows()),
+ _is_mark_join, _have_other_join_conjunct);
} else {
st = Status::InternalError("uninited hash table");
}
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index bf4a4d5763c..692b91f6a01 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -58,7 +58,7 @@ struct ProcessHashTableProbe {
template <bool need_null_map_for_probe, bool ignore_null, typename
HashTableType>
Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block, vectorized::Block*
output_block,
- size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct);
+ uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct);
// Only process the join with no other join conjunct, because of no other
join conjunt
// the output block struct is same with mutable block. we can do more opt
on it and simplify
@@ -68,7 +68,7 @@ struct ProcessHashTableProbe {
bool with_other_conjuncts, bool is_mark_join>
Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block,
- size_t probe_rows);
+ uint32_t probe_rows);
// In the presence of other join conjunct, the process of join become more
complicated.
// each matching join column need to be processed by other join conjunct.
so the struct of mutable block
// and output block may be different
@@ -93,7 +93,7 @@ struct ProcessHashTableProbe {
/// For null aware join with other conjuncts, if the probe key of one row
on left side is null,
/// we should make this row match with all rows in build side.
- size_t _process_probe_null_key(uint32_t probe_idx);
+ uint32_t _process_probe_null_key(uint32_t probe_idx);
pipeline::HashJoinProbeLocalState* _parent = nullptr;
const int _batch_size;
@@ -138,8 +138,8 @@ struct ProcessHashTableProbe {
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr;
- int _right_col_idx;
- int _right_col_len;
+ size_t _right_col_idx;
+ size_t _right_col_len;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 667c7a468d7..7fc639b47a4 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -19,6 +19,7 @@
#include <gen_cpp/PlanNodes_types.h>
+#include "common/cast_set.h"
#include "common/status.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "process_hash_table_probe.h"
@@ -29,7 +30,7 @@
#include "vec/exprs/vexpr_context.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
template <int JoinOpType>
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState*
parent,
int batch_size)
@@ -192,7 +193,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
vectorized::ConstNullMapPtr null_map,
vectorized::MutableBlock&
mutable_block,
vectorized::Block*
output_block,
- size_t probe_rows) {
+ uint32_t probe_rows) {
if (_right_col_len && !_build_block) {
return Status::InternalError("build block is nullptr");
}
@@ -216,7 +217,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
auto& mcol = mutable_block.mutable_columns();
const bool has_mark_join_conjunct = !_parent->_mark_join_conjuncts.empty();
- int current_offset = 0;
+ uint32_t current_offset = 0;
if constexpr ((JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
with_other_conjuncts) {
@@ -259,8 +260,9 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
with_other_conjuncts, is_mark_join,
need_null_map_for_probe &&
ignore_null > (hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(),
- probe_index, build_index, probe_rows,
_probe_indexs.data(),
- _probe_visited, _build_indexs.data(),
has_mark_join_conjunct);
+ probe_index, build_index,
cast_set<int32_t>(probe_rows),
+ _probe_indexs.data(), _probe_visited,
_build_indexs.data(),
+ has_mark_join_conjunct);
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
@@ -304,12 +306,12 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
}
template <int JoinOpType>
-size_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t
probe_index) {
+uint32_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t
probe_index) {
const auto rows = _build_block->rows();
DCHECK_LT(_build_index_for_null_probe_key, rows);
DCHECK_LT(0, _build_index_for_null_probe_key);
- size_t matched_cnt = 0;
+ uint32_t matched_cnt = 0;
for (; _build_index_for_null_probe_key < rows && matched_cnt <
_batch_size; ++matched_cnt) {
_probe_indexs[matched_cnt] = probe_index;
_build_indexs[matched_cnt] = _build_index_for_null_probe_key++;
@@ -503,7 +505,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
}
SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
- int orig_columns = output_block->columns();
+ size_t orig_columns = output_block->columns();
vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
{
bool can_be_filter_all = false;
@@ -678,7 +680,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
vectorized::ConstNullMapPtr
null_map,
vectorized::MutableBlock&
mutable_block,
vectorized::Block*
output_block,
- size_t probe_rows, bool
is_mark_join,
+ uint32_t probe_rows, bool
is_mark_join,
bool
have_other_join_conjunct) {
Status res;
std::visit(
@@ -705,22 +707,22 @@ struct ExtractType<T(U)> {
ProcessHashTableProbe<JoinOpType>::process<false, false,
ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
- size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
template Status
\
ProcessHashTableProbe<JoinOpType>::process<false, true,
ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
- size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
template Status
\
ProcessHashTableProbe<JoinOpType>::process<true, false,
ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
- size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
template Status
\
ProcessHashTableProbe<JoinOpType>::process<true, true,
ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
- size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
\
template Status
\
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>(
\
@@ -746,5 +748,5 @@ struct ExtractType<T(U)> {
INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>)); \
INSTANTIATION(JoinOpType, (MethodOneString)); \
INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<false>));
-
+#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index e2f684d19f5..9a81333efae 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -24,6 +24,7 @@
#include "vec/core/materialize_block.h"
namespace doris::pipeline {
+#include "common/compile_check_begin.h"
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state,
vectorized::Block* in_block,
@@ -87,14 +88,14 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
vectorized::materialize_block_inplace(block);
vectorized::ColumnRawPtrs raw_ptrs(_child_exprs.size());
RETURN_IF_ERROR(_extract_build_column(local_state, block, raw_ptrs, rows));
-
+ auto st = Status::OK();
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state, rows,
raw_ptrs, state);
- static_cast<void>(hash_table_build_process(arg,
local_state._arena));
+ st = hash_table_build_process(arg, local_state._arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
@@ -102,7 +103,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
},
local_state._shared_state->hash_table_variants->method_variant);
- return Status::OK();
+ return st;
}
template <bool is_intersect>
@@ -119,7 +120,7 @@ Status
SetSinkOperatorX<is_intersect>::_extract_build_column(
rows = is_all_const ? 1 : rows;
for (size_t i = 0; i < _child_exprs.size(); ++i) {
- int result_col_id = result_locs[i];
+ size_t result_col_id = result_locs[i];
if (is_all_const) {
block.get_by_position(result_col_id).column =
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 1c08eddc141..65c33795e5d 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -23,6 +23,7 @@
#include "operator.h"
namespace doris {
+#include "common/compile_check_begin.h"
namespace vectorized {
template <class HashTableContext, bool is_intersected>
@@ -106,13 +107,14 @@ private:
size_t& rows);
const int _cur_child_id;
- const int _child_quantity;
+ const size_t _child_quantity;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
const std::vector<TExpr> _partition_exprs;
using OperatorBase::_child;
};
+#include "common/compile_check_end.h"
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index 278e2bb7014..58958462c2f 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -23,7 +23,7 @@
#include "pipeline/exec/operator.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
template <bool is_intersect>
Status SetSourceLocalState<is_intersect>::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
@@ -115,7 +115,7 @@ template <typename HashTableContext>
Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
SetSourceLocalState<is_intersect>& local_state, HashTableContext&
hash_table_ctx,
vectorized::Block* output_block, const int batch_size, bool* eos) {
- int left_col_len = local_state._left_table_data_types.size();
+ size_t left_col_len = local_state._left_table_data_types.size();
hash_table_ctx.init_iterator();
auto& iter = hash_table_ctx.iterator;
auto block_size = 0;
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
index 5157a2f9c97..ce3d0c52edf 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -26,7 +26,7 @@ namespace doris {
class RuntimeState;
namespace pipeline {
-
+#include "common/compile_check_begin.h"
template <bool is_intersect>
class SetSourceOperatorX;
@@ -82,8 +82,8 @@ private:
void _add_result_columns(SetSourceLocalState<is_intersect>& local_state,
RowRefListWithFlags& value, int& block_size);
- const int _child_quantity;
+ const size_t _child_quantity;
};
-
+#include "common/compile_check_end.h"
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 34b7fb503b5..da27a39772d 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -17,6 +17,7 @@
#include "pipeline/local_exchange/local_exchanger.h"
+#include "common/cast_set.h"
#include "common/status.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
@@ -25,7 +26,7 @@
#include "vec/runtime/partitioner.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
template <typename BlockType>
void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
@@ -170,11 +171,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t*
__restrict channel_ids,
vectorized::Block* block,
LocalExchangeSinkLocalState& local_state)
{
- const auto rows = block->rows();
+ const auto rows = cast_set<int32_t>(block->rows());
auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
{
local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
- for (size_t i = 0; i < rows; ++i) {
+ for (int32_t i = 0; i < rows; ++i) {
local_state._partition_rows_histogram[channel_ids[i]]++;
}
for (int32_t i = 1; i <= _num_partitions; ++i) {
@@ -212,7 +213,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
*/
const auto& map =
local_state._parent->cast<LocalExchangeSinkOperatorX>()
._shuffle_idx_to_instance_idx;
- new_block_wrapper->ref(map.size());
+ new_block_wrapper->ref(cast_set<int>(map.size()));
for (const auto& it : map) {
DCHECK(it.second >= 0 && it.second < _num_partitions)
<< it.first << " : " << it.second << " " <<
_num_partitions;
@@ -241,7 +242,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
} else {
DCHECK(!bucket_seq_to_instance_idx.empty());
new_block_wrapper->ref(_num_partitions);
- for (size_t i = 0; i < _num_partitions; i++) {
+ for (int i = 0; i < _num_partitions; i++) {
uint32_t start = local_state._partition_rows_histogram[i];
uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
@@ -426,7 +427,7 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(),
local_state._channel_id);
wrapper->ref(_num_partitions);
- for (size_t i = 0; i < _num_partitions; i++) {
+ for (int i = 0; i < _num_partitions; i++) {
_enqueue_data_and_set_ready(i, local_state, {wrapper, {0,
wrapper->data_block.rows()}});
}
@@ -500,11 +501,11 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict
channel_ids,
vectorized::Block* block,
LocalExchangeSinkLocalState&
local_state) {
- const auto rows = block->rows();
+ const auto rows = cast_set<int32_t>(block->rows());
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
- for (size_t i = 0; i < rows; ++i) {
+ for (int32_t i = 0; i < rows; ++i) {
local_state._partition_rows_histogram[channel_ids[i]]++;
}
for (int32_t i = 1; i <= _num_partitions; ++i) {
@@ -517,7 +518,7 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
local_state._partition_rows_histogram[channel_ids[i]]--;
}
}
- for (size_t i = 0; i < _num_partitions; i++) {
+ for (int32_t i = 0; i < _num_partitions; i++) {
const size_t start = local_state._partition_rows_histogram[i];
const size_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 01b55816ba8..b3731638cb3 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -21,6 +21,7 @@
#include "pipeline/exec/operator.h"
namespace doris::pipeline {
+#include "common/compile_check_begin.h"
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
@@ -351,5 +352,5 @@ private:
std::atomic_bool _is_pass_through = false;
std::atomic_int32_t _total_block = 0;
};
-
+#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index ef0ae9e9a75..9554537ca16 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -25,12 +25,13 @@
#include <utility>
#include <vector>
+#include "common/cast_set.h"
#include "common/status.h"
#include "pipeline/exec/operator.h"
#include "util/runtime_profile.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
class PipelineFragmentContext;
class Pipeline;
@@ -119,10 +120,11 @@ public:
fmt::format_to(debug_string_buffer,
"Pipeline [id: {}, _num_tasks: {}, _num_tasks_created:
{}]", _pipeline_id,
_num_tasks, _num_tasks_created);
- for (size_t i = 0; i < _operators.size(); i++) {
+ for (int i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
_operators[i]->debug_string(i));
}
- fmt::format_to(debug_string_buffer, "\n{}",
_sink->debug_string(_operators.size()));
+ fmt::format_to(debug_string_buffer, "\n{}",
+ _sink->debug_string(cast_set<int>(_operators.size())));
return fmt::to_string(debug_string_buffer);
}
@@ -168,5 +170,5 @@ private:
// Parallelism of parent pipeline.
const int _num_tasks_of_parent;
};
-
+#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index fd3baefa76f..ef856da5135 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -778,7 +778,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
std::max(cur_pipe->num_tasks(), _num_instances),
use_global_hash_shuffle ? _total_instances : _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(
+
_runtime_state->query_options().local_exchange_free_blocks_limit)
: 0);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
@@ -786,21 +787,24 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
std::max(cur_pipe->num_tasks(), _num_instances),
_num_instances, num_buckets,
ignore_data_hash_distribution,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(
+
_runtime_state->query_options().local_exchange_free_blocks_limit)
: 0);
break;
case ExchangeType::PASSTHROUGH:
shared_state->exchanger = PassthroughExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(
+
_runtime_state->query_options().local_exchange_free_blocks_limit)
: 0);
break;
case ExchangeType::BROADCAST:
shared_state->exchanger = BroadcastExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(
+
_runtime_state->query_options().local_exchange_free_blocks_limit)
: 0);
break;
case ExchangeType::PASS_TO_ONE:
@@ -809,13 +813,15 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
shared_state->exchanger = PassToOneExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(_runtime_state->query_options()
+
.local_exchange_free_blocks_limit)
: 0);
} else {
shared_state->exchanger = BroadcastExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(_runtime_state->query_options()
+
.local_exchange_free_blocks_limit)
: 0);
}
break;
@@ -830,7 +836,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
shared_state->exchanger = LocalMergeSortExchanger::create_unique(
sort_source, cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(
+
_runtime_state->query_options().local_exchange_free_blocks_limit)
: 0);
break;
}
@@ -838,7 +845,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ ? cast_set<int>(
+
_runtime_state->query_options().local_exchange_free_blocks_limit)
: 0);
break;
default:
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index ea9fb09e260..ade960650d7 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -27,7 +27,7 @@
#include "runtime/workload_group/workload_group.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
TaskQueue::~TaskQueue() = default;
PipelineTask* SubTaskQueue::try_take(bool is_steal) {
@@ -121,7 +121,7 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
// update empty queue's runtime, to avoid too high priority
if (_sub_queues[level].empty() &&
- _queue_level_min_vruntime > _sub_queues[level].get_vruntime()) {
+ double(_queue_level_min_vruntime) > _sub_queues[level].get_vruntime())
{
_sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
}
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index b389ebc2c51..c9e74248c7a 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -32,6 +32,7 @@
#include "pipeline_task.h"
namespace doris::pipeline {
+#include "common/compile_check_begin.h"
class TaskQueue {
public:
@@ -70,11 +71,13 @@ public:
// note:
// runtime is the time consumed by the actual execution of the task
// vruntime(means virtual runtime) = runtime / _level_factor
- double get_vruntime() { return _runtime / _level_factor; }
+ double get_vruntime() { return double(_runtime) / _level_factor; }
void inc_runtime(uint64_t delta_time) { _runtime += delta_time; }
- void adjust_runtime(uint64_t vruntime) { this->_runtime =
uint64_t(vruntime * _level_factor); }
+ void adjust_runtime(uint64_t vruntime) {
+ this->_runtime = uint64_t(double(vruntime) * _level_factor);
+ }
bool empty() { return _queue.empty(); }
@@ -150,5 +153,5 @@ private:
std::atomic<uint32_t> _next_core = 0;
std::atomic<bool> _closed;
};
-
+#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 475d3a8065f..5a4f8819bcb 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -44,7 +44,7 @@
#include "vec/runtime/vdatetime_value.h"
namespace doris::pipeline {
-
+#include "common/compile_check_begin.h"
TaskScheduler::~TaskScheduler() {
stop();
LOG(INFO) << "Task scheduler " << _name << " shutdown";
@@ -60,7 +60,7 @@ Status TaskScheduler::start() {
.build(&_fix_thread_pool));
LOG_INFO("TaskScheduler set cores").tag("size", cores);
_markers.resize(cores, true);
- for (size_t i = 0; i < cores; ++i) {
+ for (int i = 0; i < cores; ++i) {
RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i);
}));
}
return Status::OK();
@@ -97,7 +97,7 @@ void _close_task(PipelineTask* task, Status exec_status) {
task->fragment_context()->close_a_pipeline(task->pipeline_id());
}
-void TaskScheduler::_do_work(size_t index) {
+void TaskScheduler::_do_work(int index) {
while (_markers[index]) {
auto* task = _task_queue->take(index);
if (!task) {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 9a20807ea26..6fc6ad8d6f2 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -71,6 +71,6 @@ private:
std::string _name;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
- void _do_work(size_t index);
+ void _do_work(int index);
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h
b/be/src/vec/common/hash_table/hash_table_set_build.h
index f9aeeeef14c..b90cafc0883 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -24,7 +24,7 @@ constexpr size_t CHECK_FRECUENCY = 65536;
template <class HashTableContext, bool is_intersect>
struct HashTableBuild {
template <typename Parent>
- HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs,
RuntimeState* state)
+ HashTableBuild(Parent* parent, size_t rows, ColumnRawPtrs& build_raw_ptrs,
RuntimeState* state)
: _rows(rows), _build_raw_ptrs(build_raw_ptrs), _state(state) {}
Status operator()(HashTableContext& hash_table_ctx, Arena& arena) {
@@ -50,7 +50,7 @@ struct HashTableBuild {
}
private:
- const int _rows;
+ const size_t _rows;
ColumnRawPtrs& _build_raw_ptrs;
RuntimeState* _state = nullptr;
};
diff --git a/be/src/vec/common/hash_table/join_hash_table.h
b/be/src/vec/common/hash_table/join_hash_table.h
index 317987541cd..485c5f7b3b2 100644
--- a/be/src/vec/common/hash_table/join_hash_table.h
+++ b/be/src/vec/common/hash_table/join_hash_table.h
@@ -142,7 +142,7 @@ public:
JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx,
probe_rows);
}
- return std::tuple {0, 0U, 0};
+ return std::tuple {0, 0U, 0U};
}
/**
@@ -163,7 +163,7 @@ public:
uint32_t* __restrict build_idxs,
uint8_t* __restrict null_flags,
bool picking_null_keys) {
- auto matched_cnt = 0;
+ uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
auto do_the_probe = [&]() {
@@ -274,7 +274,7 @@ private:
uint32_t*
__restrict build_idxs) {
static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN);
- auto matched_cnt = 0;
+ uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
while (probe_idx < probe_rows && matched_cnt < batch_size) {
@@ -300,14 +300,14 @@ private:
}
probe_idx++;
}
- return std::tuple {probe_idx, 0U, 0};
+ return std::tuple {probe_idx, 0U, 0U};
}
template <int JoinOpType, bool need_judge_null>
auto _find_batch_left_semi_anti(const Key* __restrict keys,
const uint32_t* __restrict build_idx_map,
int probe_idx,
int probe_rows, uint32_t* __restrict
probe_idxs) {
- auto matched_cnt = 0;
+ uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
while (probe_idx < probe_rows && matched_cnt < batch_size) {
@@ -334,7 +334,7 @@ private:
auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t*
__restrict build_idx_map,
int probe_idx, uint32_t build_idx, int
probe_rows,
uint32_t* __restrict probe_idxs, uint32_t*
__restrict build_idxs) {
- auto matched_cnt = 0;
+ uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
auto do_the_probe = [&]() {
@@ -405,7 +405,7 @@ private:
uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs, bool&
probe_visited,
uint32_t* __restrict build_idxs) {
- auto matched_cnt = 0;
+ uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
auto do_the_probe = [&]() {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 9405ed2e43e..ac820bcab29 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -230,7 +230,7 @@ Status Channel::close(RuntimeState* state) {
BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent,
bool is_local)
: _parent(parent), _is_local(is_local),
_batch_size(parent->state()->batch_size()) {}
-Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int
num_receivers,
+Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest,
size_t num_receivers,
bool* serialized, bool eos,
const std::vector<uint32_t>*
rows) {
if (_mutable_block == nullptr) {
@@ -261,7 +261,7 @@ Status BlockSerializer::next_serialized_block(Block* block,
PBlock* dest, int nu
return Status::OK();
}
-Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
+Status BlockSerializer::serialize_block(PBlock* dest, size_t num_receivers) {
if (_mutable_block && _mutable_block->rows() > 0) {
auto block = _mutable_block->to_block();
RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
@@ -272,7 +272,7 @@ Status BlockSerializer::serialize_block(PBlock* dest, int
num_receivers) {
return Status::OK();
}
-Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int
num_receivers) {
+Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, size_t
num_receivers) {
SCOPED_TIMER(_parent->_serialize_batch_timer);
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index da0ee22ac14..88bb804fd80 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -76,10 +76,10 @@ namespace vectorized {
class BlockSerializer {
public:
BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local =
true);
- Status next_serialized_block(Block* src, PBlock* dest, int num_receivers,
bool* serialized,
+ Status next_serialized_block(Block* src, PBlock* dest, size_t
num_receivers, bool* serialized,
bool eos, const std::vector<uint32_t>* rows =
nullptr);
- Status serialize_block(PBlock* dest, int num_receivers = 1);
- Status serialize_block(const Block* src, PBlock* dest, int num_receivers =
1);
+ Status serialize_block(PBlock* dest, size_t num_receivers = 1);
+ Status serialize_block(const Block* src, PBlock* dest, size_t
num_receivers = 1);
MutableBlock* get_block() const { return _mutable_block.get(); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]