This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new b6b785f99bd improve spill counters (#41724)
b6b785f99bd is described below
commit b6b785f99bde491e1a74e7dfcbc40854e581aa8c
Author: TengJianPing <[email protected]>
AuthorDate: Mon Oct 14 11:28:14 2024 +0800
improve spill counters (#41724)
---
be/src/pipeline/dependency.cpp | 22 +++
be/src/pipeline/dependency.h | 35 ++++
be/src/pipeline/exec/aggregation_sink_operator.cpp | 1 -
.../distinct_streaming_aggregation_operator.cpp | 1 -
be/src/pipeline/exec/operator.h | 214 +++++++++++++++++----
.../exec/partitioned_aggregation_sink_operator.cpp | 14 +-
.../exec/partitioned_aggregation_sink_operator.h | 11 +-
.../partitioned_aggregation_source_operator.cpp | 29 +--
.../exec/partitioned_hash_join_probe_operator.cpp | 93 +++++----
.../exec/partitioned_hash_join_probe_operator.h | 5 -
.../exec/partitioned_hash_join_sink_operator.cpp | 125 ++++++++----
.../exec/partitioned_hash_join_sink_operator.h | 1 +
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 45 +++--
be/src/pipeline/exec/spill_sort_sink_operator.h | 1 +
.../pipeline/exec/spill_sort_source_operator.cpp | 42 ++--
be/src/pipeline/exec/spill_sort_source_operator.h | 4 -
be/src/pipeline/exec/spill_utils.h | 35 +++-
.../exec/streaming_aggregation_operator.cpp | 1 -
be/src/vec/core/block.h | 12 +-
be/src/vec/spill/spill_reader.cpp | 16 +-
be/src/vec/spill/spill_reader.h | 24 ++-
be/src/vec/spill/spill_stream.cpp | 40 ++--
be/src/vec/spill/spill_stream.h | 32 +--
be/src/vec/spill/spill_stream_manager.cpp | 1 +
be/src/vec/spill/spill_writer.cpp | 42 ++--
be/src/vec/spill/spill_writer.h | 44 +++--
26 files changed, 603 insertions(+), 287 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 315e76a2426..cdde0c6330c 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -327,6 +327,19 @@ void PartitionedAggSharedState::init_spill_params(size_t
spill_partition_count_b
}
}
+void PartitionedAggSharedState::update_spill_stream_profiles(RuntimeProfile*
source_profile) {
+ for (auto& partition : spill_partitions) {
+ if (partition->spilling_stream_) {
+
partition->spilling_stream_->update_shared_profiles(source_profile);
+ }
+ for (auto& stream : partition->spill_streams_) {
+ if (stream) {
+ stream->update_shared_profiles(source_profile);
+ }
+ }
+ }
+}
+
Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id,
RuntimeProfile* profile,
vectorized::SpillStreamSPtr&
spill_stream) {
@@ -339,6 +352,7 @@ Status AggSpillPartition::get_spill_stream(RuntimeState*
state, int node_id,
std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), profile));
spill_streams_.emplace_back(spilling_stream_);
spill_stream = spilling_stream_;
+ spill_stream->set_write_counters(profile);
return Status::OK();
}
void AggSpillPartition::close() {
@@ -365,6 +379,14 @@ void PartitionedAggSharedState::close() {
spill_partitions.clear();
}
+void SpillSortSharedState::update_spill_stream_profiles(RuntimeProfile*
source_profile) {
+ for (auto& stream : sorted_streams) {
+ if (stream) {
+ stream->update_shared_profiles(source_profile);
+ }
+ }
+}
+
void SpillSortSharedState::close() {
// need to use CAS instead of only `if (!is_closed)` statement,
// to avoid concurrent entry of close() both pass the if statement
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 3cf1cbff9fa..20251aed294 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -426,14 +426,36 @@ private:
Status _destroy_agg_status(vectorized::AggregateDataPtr data);
};
+struct BasicSpillSharedState {
+ virtual ~BasicSpillSharedState() = default;
+
+ // These two counters are shared to spill source operators as the initial
value
+ // of 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount'.
+ // Total bytes of spill data written to disk file(after serialized)
+ RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+ RuntimeProfile::Counter* _spill_file_total_count = nullptr;
+
+ void setup_shared_profile(RuntimeProfile* sink_profile) {
+ _spill_file_total_count =
+ ADD_COUNTER_WITH_LEVEL(sink_profile,
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
+ _spill_write_file_data_size =
+ ADD_COUNTER_WITH_LEVEL(sink_profile,
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
+ }
+
+ virtual void update_spill_stream_profiles(RuntimeProfile* source_profile)
= 0;
+};
+
struct AggSpillPartition;
struct PartitionedAggSharedState : public BasicSharedState,
+ public BasicSpillSharedState,
public
std::enable_shared_from_this<PartitionedAggSharedState> {
ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
PartitionedAggSharedState() = default;
~PartitionedAggSharedState() override = default;
+ void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
+
void init_spill_params(size_t spill_partition_count_bits);
void close();
@@ -498,6 +520,7 @@ public:
};
struct SpillSortSharedState : public BasicSharedState,
+ public BasicSpillSharedState,
public
std::enable_shared_from_this<SpillSortSharedState> {
ENABLE_FACTORY_CREATOR(SpillSortSharedState)
@@ -515,6 +538,9 @@ struct SpillSortSharedState : public BasicSharedState,
LOG(INFO) << "spill sort block batch row count: " <<
spill_block_batch_row_count;
}
}
+
+ void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
+
void close();
SortSharedState* in_mem_shared_state = nullptr;
@@ -622,9 +648,18 @@ struct HashJoinSharedState : public JoinSharedState {
struct PartitionedHashJoinSharedState
: public HashJoinSharedState,
+ public BasicSpillSharedState,
public std::enable_shared_from_this<PartitionedHashJoinSharedState> {
ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState)
+ void update_spill_stream_profiles(RuntimeProfile* source_profile) override
{
+ for (auto& stream : spilled_streams) {
+ if (stream) {
+ stream->update_shared_profiles(source_profile);
+ }
+ }
+ }
+
std::unique_ptr<RuntimeState> inner_runtime_state;
std::shared_ptr<HashJoinSharedState> inner_shared_state;
std::vector<std::unique_ptr<vectorized::MutableBlock>>
partitioned_build_blocks;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 3dd43c3c4d8..79d1460cd06 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -66,7 +66,6 @@ Status AggSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
- _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
_expr_timer = ADD_TIMER(Base::profile(), "ExprTime");
_serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 79347064048..b0e635bef84 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -72,7 +72,6 @@ Status DistinctStreamingAggLocalState::init(RuntimeState*
state, LocalStateInfo&
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_init_timer);
_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
- _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
_hash_table_compute_timer = ADD_TIMER(Base::profile(),
"HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
_hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 649bc70c238..da0cc008f06 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -287,28 +287,137 @@ public:
Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state,
info));
- _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1);
- _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillRecoverTime", 1);
- _spill_read_data_time = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadDataTime", 1);
- _spill_deserialize_time = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillDeserializeTime", 1);
- _spill_read_bytes =
- ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize",
TUnit::BYTES, 1);
- _spill_wait_in_queue_timer =
- ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime",
1);
- _spill_write_wait_io_timer =
- ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime",
1);
- _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadWaitIOTime", 1);
+
+ _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillTotalTime", 1);
+ init_spill_read_counters();
+
return Status::OK();
}
- RuntimeProfile::Counter* _spill_timer = nullptr;
- RuntimeProfile::Counter* _spill_recover_time;
- RuntimeProfile::Counter* _spill_read_data_time;
- RuntimeProfile::Counter* _spill_deserialize_time;
- RuntimeProfile::Counter* _spill_read_bytes;
- RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
- RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
- RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
+ void init_spill_write_counters() {
+ _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteTime", 1);
+
+ _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
+ Base::profile(), "SpillWriteTaskWaitInQueueCount",
TUnit::UNIT, 1);
+ _spill_writing_task_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount",
TUnit::UNIT, 1);
+ _spill_write_wait_in_queue_timer =
+ ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteTaskWaitInQueueTime", 1);
+
+ _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteFileTime", 1);
+
+ _spill_write_serialize_block_timer =
+ ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteSerializeBlockTime", 1);
+ _spill_write_block_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
+ _spill_write_block_data_size =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
+ _spill_write_file_data_size =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
+ _spill_write_rows_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows",
TUnit::UNIT, 1);
+ _spill_file_total_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
+ }
+
+ void init_spill_read_counters() {
+ // Spill read counters
+ _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillRecoverTime", 1);
+
+ _spill_read_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
+ Base::profile(), "SpillReadTaskWaitInQueueCount", TUnit::UNIT,
1);
+ _spill_reading_task_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadTaskCount",
TUnit::UNIT, 1);
+ _spill_read_wait_in_queue_timer =
+ ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadTaskWaitInQueueTime", 1);
+
+ _spill_read_file_time = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadFileTime", 1);
+ _spill_read_derialize_block_timer =
+ ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadDerializeBlockTime", 1);
+
+ _spill_read_block_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount",
TUnit::UNIT, 1);
+ _spill_read_block_data_size =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillReadBlockDataSize", TUnit::BYTES, 1);
+ _spill_read_file_size =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileSize",
TUnit::BYTES, 1);
+ _spill_read_rows_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows",
TUnit::UNIT, 1);
+ _spill_read_file_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount",
TUnit::UNIT, 1);
+
+ _spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
+ Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);
+ _spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
+ Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
+ }
+
+ // These two counters are shared to spill source operators as the initial
value
+ // Initialize values of counters 'SpillWriteFileCurrentSize' and
'SpillWriteFileCurrentCount'
+ // from spill sink operators' "SpillWriteFileTotalCount" and
"SpillWriteFileTotalSize"
+ void copy_shared_spill_profile() {
+ if (_copy_shared_spill_profile) {
+ _copy_shared_spill_profile = false;
+ const auto* spill_shared_state = (const
BasicSpillSharedState*)Base::_shared_state;
+ COUNTER_SET(_spill_file_current_size,
+
spill_shared_state->_spill_write_file_data_size->value());
+ COUNTER_SET(_spill_file_current_count,
+ spill_shared_state->_spill_file_total_count->value());
+ Base::_shared_state->update_spill_stream_profiles(Base::profile());
+ }
+ }
+
+ // Total time of spill, including spill task scheduling time,
+ // serialize block time, write disk file time,
+ // and read disk file time, deserialize block time etc.
+ RuntimeProfile::Counter* _spill_total_timer = nullptr;
+
+ // Spill write counters
+ // Total time of spill write, including serialize block time, write disk
file,
+ // and wait in queue time, etc.
+ RuntimeProfile::Counter* _spill_write_timer = nullptr;
+
+ RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* _spill_writing_task_count = nullptr;
+ RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
+
+ // Total time of writing file
+ RuntimeProfile::Counter* _spill_write_file_timer = nullptr;
+ RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr;
+ // Original count of spilled Blocks
+ // One Big Block maybe split into multiple small Blocks when actually
written to disk file.
+ RuntimeProfile::Counter* _spill_write_block_count = nullptr;
+ // Total bytes of spill data in Block format(in memory format)
+ RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
+ // Total bytes of spill data written to disk file(after serialized)
+ RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+ RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
+ RuntimeProfile::Counter* _spill_file_total_count = nullptr;
+ RuntimeProfile::Counter* _spill_file_current_count = nullptr;
+ // Spilled file total size
+ RuntimeProfile::Counter* _spill_file_total_size = nullptr;
+ // Current spilled file size
+ RuntimeProfile::Counter* _spill_file_current_size = nullptr;
+
+ // Spill read counters
+ // Total time of recovring spilled data, including read file time,
deserialize time, etc.
+ RuntimeProfile::Counter* _spill_recover_time = nullptr;
+
+ RuntimeProfile::Counter* _spill_read_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* _spill_reading_task_count = nullptr;
+ RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
+
+ RuntimeProfile::Counter* _spill_read_file_time = nullptr;
+ RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr;
+ RuntimeProfile::Counter* _spill_read_block_count = nullptr;
+ // Total bytes of read data in Block format(in memory format)
+ RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
+ // Total bytes of spill data read from disk file
+ RuntimeProfile::Counter* _spill_read_file_size = nullptr;
+ RuntimeProfile::Counter* _spill_read_rows_count = nullptr;
+ RuntimeProfile::Counter* _spill_read_file_count = nullptr;
+
+ bool _copy_shared_spill_profile = true;
};
class DataSinkOperatorXBase;
@@ -589,19 +698,28 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(Base::init(state, info));
- _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1);
- _spill_serialize_block_timer =
- ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillSerializeBlockTime", 1);
- _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteDiskTime", 1);
- _spill_data_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize",
TUnit::BYTES, 1);
- _spill_block_count =
+ _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillTotalTime", 1);
+
+ _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteTime", 1);
+
+ _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
+ Base::profile(), "SpillWriteTaskWaitInQueueCount",
TUnit::UNIT, 1);
+ _spill_writing_task_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount",
TUnit::UNIT, 1);
+ _spill_write_wait_in_queue_timer =
+ ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteTaskWaitInQueueTime", 1);
+
+ _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteFileTime", 1);
+
+ _spill_write_serialize_block_timer =
+ ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteSerializeBlockTime", 1);
+ _spill_write_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
- _spill_wait_in_queue_timer =
- ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime",
1);
- _spill_write_wait_io_timer =
- ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime",
1);
- _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadWaitIOTime", 1);
+ _spill_write_block_data_size =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
+ _spill_write_rows_count =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows",
TUnit::UNIT, 1);
+
_spill_max_rows_of_partition =
ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillMaxRowsOfPartition", TUnit::UNIT, 1);
_spill_min_rows_of_partition =
@@ -633,14 +751,32 @@ public:
std::vector<int64_t> _rows_in_partitions;
- RuntimeProfile::Counter* _spill_timer = nullptr;
- RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
- RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
- RuntimeProfile::Counter* _spill_data_size = nullptr;
- RuntimeProfile::Counter* _spill_block_count = nullptr;
- RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
- RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
- RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
+ // Total time of spill, including spill task scheduling time,
+ // serialize block time, write disk file time,
+ // and read disk file time, deserialize block time etc.
+ RuntimeProfile::Counter* _spill_total_timer = nullptr;
+
+ // Spill write counters
+ // Total time of spill write, including serialize block time, write disk
file,
+ // and wait in queue time, etc.
+ RuntimeProfile::Counter* _spill_write_timer = nullptr;
+
+ RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* _spill_writing_task_count = nullptr;
+ RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
+
+ // Total time of writing file
+ RuntimeProfile::Counter* _spill_write_file_timer = nullptr;
+ RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr;
+ // Original count of spilled Blocks
+ // One Big Block maybe split into multiple small Blocks when actually
written to disk file.
+ RuntimeProfile::Counter* _spill_write_block_count = nullptr;
+ // Total bytes of spill data in Block format(in memory format)
+ RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
+ RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
+ // Spilled file total size
+ RuntimeProfile::Counter* _spill_file_total_size = nullptr;
+
RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr;
RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr;
};
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 1a86fdb2a9d..f5c09459f85 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -76,6 +76,7 @@ Status
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
Status PartitionedAggSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_open_timer);
+ _shared_state->setup_shared_profile(_profile);
return Base::open(state);
}
@@ -301,8 +302,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
- state, _shared_state->shared_from_this(),
+ state, _profile, true, _shared_state->shared_from_this(),
[this, &parent, state, query_id, size_to_revoke, spill_context,
submit_timer] {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_write_timer);
+
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_sink "
@@ -310,8 +320,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
return st;
});
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_TIMER(Base::_spill_timer);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() ||
state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 9b70da54943..fa32e032303 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -150,10 +150,6 @@ public:
auto status = spill_partition->get_spill_stream(state,
Base::_parent->node_id(),
Base::profile(),
spill_stream);
RETURN_IF_ERROR(status);
- spill_stream->set_write_counters(Base::_spill_serialize_block_timer,
- Base::_spill_block_count,
Base::_spill_data_size,
- Base::_spill_write_disk_timer,
- Base::_spill_write_wait_io_timer,
memory_used_counter());
status = to_block(context, keys, values, null_key_data);
RETURN_IF_ERROR(status);
@@ -168,14 +164,9 @@ public:
keys.clear();
values.clear();
}
- status = spill_stream->prepare_spill();
+ status = spill_stream->spill_block(state, block_, false);
RETURN_IF_ERROR(status);
- {
- SCOPED_TIMER(_spill_write_disk_timer);
- status = spill_stream->spill_block(state, block_, false);
- }
- RETURN_IF_ERROR(status);
status = spill_partition->flush_if_full();
_reset_tmp_data();
return status;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index c6a6c09f01b..1b49c0d3768 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -137,6 +137,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState*
state) {
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
+ local_state.copy_shared_spill_profile();
Defer defer {[&]() {
if (!local_state._status.ok() || *eos) {
local_state._shared_state->close();
@@ -226,8 +227,6 @@ Status
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state,
bool& has_data) {
const auto query_id = state->query_id();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
if (_shared_state->spill_partitions.empty()) {
_shared_state->close();
@@ -236,10 +235,7 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
}
has_data = true;
- auto spill_func = [this, state, query_id, submit_timer] {
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- MonotonicStopWatch execution_timer;
- execution_timer.start();
+ auto spill_func = [this, state, query_id] {
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
@@ -255,14 +251,11 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
for (auto& stream :
_shared_state->spill_partitions[0]->spill_streams_) {
- stream->set_read_counters(Base::_spill_read_data_time,
- Base::_spill_deserialize_time,
Base::_spill_read_bytes,
- Base::_spill_read_wait_io_timer);
+ stream->set_read_counters(profile());
vectorized::Block block;
bool eos = false;
while (!eos && !state->is_cancelled()) {
{
- SCOPED_TIMER(Base::_spill_recover_time);
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
{
_status =
Status::Error<INTERNAL_ERROR>(
@@ -298,7 +291,18 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
return _status;
};
- auto exception_catch_func = [spill_func, query_id, this]() {
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ auto exception_catch_func = [spill_func, query_id, submit_timer, this]() {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_recover_time);
+
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
{
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
@@ -320,7 +324,8 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});
_spill_dependency->block();
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ std::make_shared<SpillRunnable>(state, _runtime_profile.get(),
false,
+ _shared_state->shared_from_this(),
exception_catch_func));
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 7b60c9a3e2f..6df2a7ca4ab 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -42,6 +42,8 @@
PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeSt
Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info));
+ init_spill_write_counters();
+
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
@@ -75,14 +77,6 @@ Status
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
_probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(),
"ProbeBlocksBytes", TUnit::BYTES, 1);
- _spill_serialize_block_timer =
- ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime",
1);
- _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteDiskTime", 1);
- _spill_data_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize",
TUnit::BYTES, 1);
- _spill_block_count =
- ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount",
TUnit::UNIT, 1);
-
// Build phase
_build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
_build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows",
TUnit::UNIT, "BuildPhase");
@@ -182,11 +176,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto query_id = state->query_id();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- auto spill_func = [query_id, state, submit_timer, this] {
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ auto spill_func = [query_id, state, this] {
SCOPED_TIMER(_spill_probe_timer);
size_t not_revoked_size = 0;
@@ -215,10 +205,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
state, spilling_stream, print_id(state->query_id()),
"hash_probe",
_parent->node_id(),
std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(),
_runtime_profile.get()));
- RETURN_IF_ERROR(spilling_stream->prepare_spill());
- spilling_stream->set_write_counters(
- _spill_serialize_block_timer, _spill_block_count,
_spill_data_size,
- _spill_write_disk_timer, _spill_write_wait_io_timer,
memory_used_counter());
+ spilling_stream->set_write_counters(_runtime_profile.get());
}
auto merged_block =
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
@@ -256,8 +243,19 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
return Status::OK();
};
- auto exception_catch_func = [query_id, spill_func, spill_context, this]() {
- SCOPED_TIMER(_spill_timer);
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto exception_catch_func = [query_id, spill_func, spill_context,
submit_timer, this]() {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_write_timer);
+
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
@@ -286,7 +284,8 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
"fault_inject partitioned_hash_join_probe spill_probe_blocks
submit_func failed");
});
- auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ auto spill_runnable = std::make_shared<SpillRunnable>(state,
_runtime_profile.get(), true,
+
_shared_state->shared_from_this(),
exception_catch_func);
return spill_io_pool->submit(std::move(spill_runnable));
}
@@ -296,8 +295,7 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
if (probe_spilling_stream) {
RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
- probe_spilling_stream->set_read_counters(_spill_read_data_time,
_spill_deserialize_time,
- _spill_read_bytes,
_spill_read_wait_io_timer);
+ probe_spilling_stream->set_read_counters(profile());
}
return Status::OK();
@@ -314,20 +312,15 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
if (!spilled_stream) {
return Status::OK();
}
-
- spilled_stream->set_read_counters(_spill_read_data_time,
_spill_deserialize_time,
- _spill_read_bytes,
_spill_read_wait_io_timer);
+ spilled_stream->set_read_counters(profile());
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
_shared_state->shared_from_this();
auto query_id = state->query_id();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
auto read_func = [this, query_id, state, spilled_stream = spilled_stream,
shared_state_holder,
- submit_timer, partition_index] {
+ partition_index] {
auto shared_state_sptr = shared_state_holder.lock();
if (!shared_state_sptr || state->is_cancelled()) {
LOG(INFO) << "query: " << print_id(query_id)
@@ -335,7 +328,6 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
return;
}
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_recovery_build_timer);
bool eos = false;
@@ -396,7 +388,19 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
}
};
- auto exception_catch_func = [read_func, query_id, this]() {
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_recover_time);
+
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
@@ -436,7 +440,8 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
"fault_inject partitioned_hash_join_probe "
"recovery_build_blocks submit_func failed");
});
- auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ auto spill_runnable = std::make_shared<SpillRunnable>(state,
_runtime_profile.get(), false,
+
_shared_state->shared_from_this(),
exception_catch_func);
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " <<
partition_index
@@ -475,14 +480,12 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
return Status::OK();
}
+ spilled_stream->set_read_counters(profile());
auto& blocks = _probe_blocks[partition_index];
auto query_id = state->query_id();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- auto read_func = [this, query_id, &spilled_stream, &blocks, submit_timer] {
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ auto read_func = [this, query_id, &spilled_stream, &blocks] {
SCOPED_TIMER(_recovery_probe_timer);
vectorized::Block block;
@@ -518,7 +521,18 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
}
};
- auto exception_catch_func = [read_func, query_id, this]() {
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_recover_time);
+
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
@@ -549,7 +563,8 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
"recovery_probe_blocks submit_func failed");
});
return spill_io_pool->submit(std::make_shared<SpillRunnable>(
- state, _shared_state->shared_from_this(), exception_catch_func));
+ state, _runtime_profile.get(), false,
_shared_state->shared_from_this(),
+ exception_catch_func));
}
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
pool,
@@ -957,6 +972,7 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
bool* eos) {
*eos = false;
auto& local_state = get_local_state(state);
+ local_state.copy_shared_spill_profile();
const auto need_to_spill = local_state._shared_state->need_to_spill;
#ifndef NDEBUG
Defer eos_check_defer([&] {
@@ -1017,6 +1033,7 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
}
local_state.add_num_rows_returned(block->rows());
+ COUNTER_UPDATE(local_state._blocks_returned_counter, 1);
if (*eos) {
_update_profile_from_internal_states(local_state);
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index e66b730685b..6ecbdd01e49 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -109,11 +109,6 @@ private:
RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
RuntimeProfile::Counter* _recovery_probe_timer = nullptr;
- RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
- RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
- RuntimeProfile::Counter* _spill_data_size = nullptr;
- RuntimeProfile::Counter* _spill_block_count = nullptr;
-
RuntimeProfile::Counter* _build_phase_label = nullptr;
RuntimeProfile::Counter* _build_rows_counter = nullptr;
RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index f7d38fe9d5d..8c9990da1ef 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -64,6 +64,7 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
+ _shared_state->setup_shared_profile(_profile);
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
for (uint32_t i = 0; i != p._partition_count; ++i) {
@@ -72,10 +73,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState*
state) {
state, spilling_stream, print_id(state->query_id()),
fmt::format("hash_build_sink_{}", i), _parent->node_id(),
std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _profile));
- RETURN_IF_ERROR(spilling_stream->prepare_spill());
- spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer,
- _spill_write_wait_io_timer,
memory_used_counter());
+ spilling_stream->set_write_counters(_profile);
}
return p._partitioner->clone(state, _partitioner);
}
@@ -117,6 +115,32 @@ size_t
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
return mem_size;
}
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+ if (!_shared_state->need_to_spill) {
+ if (_shared_state->inner_shared_state) {
+ auto* inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
+ if (inner_sink_state_) {
+ auto* inner_sink_state =
+
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+ COUNTER_SET(_memory_used_counter,
inner_sink_state->_memory_used_counter->value());
+ COUNTER_SET(_peak_memory_usage_counter,
+ inner_sink_state->_memory_used_counter->value());
+ }
+ }
+ return;
+ }
+
+ int64_t mem_size = 0;
+ auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+ for (auto& block : partitioned_blocks) {
+ if (block) {
+ mem_size += block->allocated_bytes();
+ }
+ }
+ COUNTER_SET(_memory_used_counter, mem_size);
+ COUNTER_SET(_peak_memory_usage_counter, mem_size);
+}
+
size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
size_t size_to_reserve = 0;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
@@ -136,14 +160,21 @@ size_t
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ HashJoinBuildSinkLocalState* inner_sink_state {nullptr};
+ if (auto* tmp_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state()) {
+ inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
+ }
_shared_state->inner_shared_state->hash_table_variants.reset();
+ if (inner_sink_state) {
+ COUNTER_UPDATE(_memory_used_counter,
+ -(inner_sink_state->_hash_table_memory_usage->value() +
+
inner_sink_state->_build_arena_memory_usage->value()));
+ }
auto row_desc = p._child->row_desc();
const auto num_slots = row_desc.num_slots();
vectorized::Block build_block;
- size_t block_old_mem = 0;
- auto* inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
- if (inner_sink_state_) {
- auto* inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+ int64_t block_old_mem = 0;
+ if (inner_sink_state) {
build_block = inner_sink_state->_build_side_mutable_block.to_block();
block_old_mem = build_block.allocated_bytes();
}
@@ -158,12 +189,12 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
}
if (build_block.columns() > num_slots) {
- build_block.erase(num_slots);
- memory_used_counter()->update(build_block.allocated_bytes() -
block_old_mem);
+ vectorized::Block::erase_useless_column(&build_block, num_slots);
+ COUNTER_UPDATE(_memory_used_counter, build_block.allocated_bytes() -
block_old_mem);
}
auto spill_func = [build_block = std::move(build_block), state, this]()
mutable {
- Defer defer {[&]() {
memory_used_counter()->set((int64_t)revocable_mem_size(state)); }};
+ Defer defer1 {[&]() { update_memory_usage(); }};
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<std::vector<uint32_t>>
partitions_indexes(p._partition_count);
@@ -175,7 +206,6 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
auto flush_rows = [&state,
this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
vectorized::SpillStreamSPtr&
spilling_stream) {
auto block = partition_block->to_block();
- Defer defer {[&]() {
memory_used_counter()->update(-block.allocated_bytes()); }};
auto status = spilling_stream->spill_block(state, block, false);
if (!status.ok()) {
@@ -198,9 +228,11 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
sub_block.get_by_position(i).column =
build_block.get_by_position(i).column->cut(offset,
this_run);
}
- auto sub_blocks_memory_usage = sub_block.allocated_bytes();
- memory_used_counter()->update(sub_blocks_memory_usage);
- Defer defer {[&]() {
memory_used_counter()->update(-sub_blocks_memory_usage); }};
+ int64_t sub_blocks_memory_usage = sub_block.allocated_bytes();
+ COUNTER_UPDATE(_memory_used_counter, sub_blocks_memory_usage);
+ COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
+ Defer defer2 {
+ [&]() { COUNTER_UPDATE(_memory_used_counter,
-sub_blocks_memory_usage); }};
offset += this_run;
const auto is_last_block = offset == total_rows;
@@ -225,12 +257,11 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
partition_block =
vectorized::MutableBlock::create_unique(build_block.clone_empty());
}
- auto old_mem = partition_block->allocated_bytes();
+ int64_t old_mem = partition_block->allocated_bytes();
{
SCOPED_TIMER(_partition_shuffle_timer);
Status st = partition_block->add_rows(&sub_block, begin,
end);
-
memory_used_counter()->update(partition_block->allocated_bytes() - old_mem);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = st;
@@ -240,6 +271,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
}
partitions_indexes[partition_idx].clear();
}
+ int64_t new_mem = partition_block->allocated_bytes();
if (partition_block->rows() >= reserved_size || is_last_block)
{
if (!flush_rows(partition_block, spilling_stream)) {
@@ -247,7 +279,10 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
}
partition_block =
vectorized::MutableBlock::create_unique(build_block.clone_empty());
-
memory_used_counter()->update(partition_block->allocated_bytes());
+ COUNTER_UPDATE(_memory_used_counter, -new_mem);
+ } else {
+ COUNTER_UPDATE(_memory_used_counter, new_mem - old_mem);
+ COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
}
}
}
@@ -255,8 +290,18 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
_spill_dependency->set_ready();
};
- auto exception_catch_func = [spill_func, spill_context, this]() mutable {
- SCOPED_TIMER(_spill_timer);
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ auto exception_catch_func = [spill_func, spill_context, submit_timer,
this]() mutable {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_write_timer);
+
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION(spill_func());
return Status::OK();
@@ -274,8 +319,8 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
}
};
- auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
-
exception_catch_func);
+ auto spill_runnable = std::make_shared<SpillRunnable>(
+ state, _profile, true, _shared_state->shared_from_this(),
exception_catch_func);
auto* thread_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -291,6 +336,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
Status PartitionedHashJoinSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
+ SCOPED_TIMER(_spill_total_timer);
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " <<
state->task_id()
<< " hash join sink " << _parent->node_id() << " revoke_memory"
<< ", eos: " << _child_eos;
@@ -307,6 +353,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
auto query_id = state->query_id();
+ auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
+ DCHECK(spill_io_pool != nullptr);
+
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size();
++i) {
vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];
@@ -319,10 +368,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
DCHECK(spilling_stream != nullptr);
- auto* spill_io_pool =
-
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
- DCHECK(spill_io_pool != nullptr);
-
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -335,8 +380,17 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
// so that when a stream finished, it should desc -1
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
- state, _shared_state->shared_from_this(),
+ state, _profile, true, _shared_state->shared_from_this(),
[this, query_id, spilling_stream, i, submit_timer,
spill_context] {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+
_spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_write_timer);
+
DBUG_EXECUTE_IF(
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
@@ -345,7 +399,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
"revoke_memory
canceled"));
return;
});
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_spill_build_timer);
auto status = [&]() {
@@ -411,7 +464,7 @@ Status
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
if (!rows) {
return Status::OK();
}
- Defer defer {[&]() {
memory_used_counter()->set((int64_t)revocable_mem_size(state)); }};
+ Defer defer {[&]() { update_memory_usage(); }};
{
/// TODO: DO NOT execute build exprs twice(when partition and building
hash table)
SCOPED_TIMER(_partition_timer);
@@ -455,10 +508,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
if (_spill_status_ok) {
auto block = partitioned_block->to_block();
- auto block_mem_usage = block.allocated_bytes();
- Defer defer {[&]() { memory_used_counter()->update(-block_mem_usage);
}};
+ int64_t block_mem_usage = block.allocated_bytes();
+ Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(),
-block_mem_usage); }};
partitioned_block =
vectorized::MutableBlock::create_unique(block.clone_empty());
- memory_used_counter()->update(partitioned_block->allocated_bytes());
auto st = spilling_stream->spill_block(state(), block, false);
if (!st.ok()) {
_spill_status_ok = false;
@@ -609,10 +661,7 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
"fault_inject partitioned_hash_join_sink "
"sink_eos failed");
});
- Defer defer {[&]() {
- local_state.memory_used_counter()->set(
- (int64_t)local_state.revocable_mem_size(state));
- }};
+ Defer defer {[&]() { local_state.update_memory_usage(); }};
RETURN_IF_ERROR(_inner_sink_operator->sink(
local_state._shared_state->inner_runtime_state.get(),
in_block, eos));
VLOG_DEBUG << "hash join sink " << node_id() << " sink eos,
set_ready_to_read"
@@ -650,9 +699,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
"fault_inject partitioned_hash_join_sink "
"sink failed");
});
- Defer defer {[&]() {
-
local_state.memory_used_counter()->set((int64_t)local_state.revocable_mem_size(state));
- }};
+ Defer defer {[&]() { local_state.update_memory_usage(); }};
RETURN_IF_ERROR(_inner_sink_operator->sink(
local_state._shared_state->inner_runtime_state.get(),
in_block, eos));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index d3725997882..aaa6d64adf9 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -47,6 +47,7 @@ public:
Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
size_t revocable_mem_size(RuntimeState* state) const;
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
+ void update_memory_usage();
protected:
PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 38e7a173197..5d8355b865d 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -50,6 +50,13 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState*
state,
return Status::OK();
}
+Status SpillSortSinkLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_open_timer);
+ _shared_state->setup_shared_profile(_profile);
+ return Base::open(state);
+}
+
void SpillSortSinkLocalState::_init_counters() {
_internal_runtime_profile =
std::make_unique<RuntimeProfile>("internal_profile");
@@ -57,10 +64,7 @@ void SpillSortSinkLocalState::_init_counters() {
_merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime");
_sort_blocks_memory_usage =
ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks",
TUnit::BYTES, 1);
-
_spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(_profile,
"SpillMergeSortTime", 1);
-
- _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillWaitInQueueTime", 1);
}
#define UPDATE_PROFILE(counter, name) \
do { \
@@ -201,13 +205,8 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
_shared_state->spill_block_batch_row_count,
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
RETURN_IF_ERROR(status);
+ _spilling_stream->set_write_counters(_profile);
- _spilling_stream->set_write_counters(
- Base::_spill_serialize_block_timer, Base::_spill_block_count,
Base::_spill_data_size,
- Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer,
memory_used_counter());
-
- status = _spilling_stream->prepare_spill();
- RETURN_IF_ERROR(status);
_shared_state->sorted_streams.emplace_back(_spilling_stream);
auto& parent = Base::_parent->template cast<Parent>();
@@ -218,11 +217,7 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
}
auto query_id = state->query_id();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- auto spill_func = [this, state, query_id, &parent, submit_timer] {
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ auto spill_func = [this, state, query_id, &parent] {
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
@@ -267,10 +262,7 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
&eos);
}
RETURN_IF_ERROR(_shared_state->sink_status);
- {
- SCOPED_TIMER(Base::_spill_timer);
- _shared_state->sink_status =
_spilling_stream->spill_block(state, block, eos);
- }
+ _shared_state->sink_status = _spilling_stream->spill_block(state,
block, eos);
RETURN_IF_ERROR(_shared_state->sink_status);
block.clear_column_data();
}
@@ -279,7 +271,19 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
return Status::OK();
};
- auto exception_catch_func = [this, query_id, spill_context, spill_func]() {
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto exception_catch_func = [this, query_id, spill_context, submit_timer,
spill_func]() {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_write_timer);
+
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
spill_sort_sink "
@@ -304,7 +308,8 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
if (status.ok()) {
state->get_query_ctx()->increase_revoking_tasks_count();
status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ std::make_shared<SpillRunnable>(state, _profile, true,
+
_shared_state->shared_from_this(),
exception_catch_func));
}
if (!status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 7a361199239..086d93a970c 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -36,6 +36,7 @@ public:
~SpillSortSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get();
}
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index fe1356381b2..5cc124caaea 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -36,6 +36,7 @@ SpillSortLocalState::SpillSortLocalState(RuntimeState* state,
OperatorXBase* par
}
Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
+ init_spill_write_counters();
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
@@ -45,14 +46,6 @@ Status SpillSortLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_internal_runtime_profile =
std::make_unique<RuntimeProfile>("internal_profile");
_spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillMergeSortTime", 1);
- _spill_serialize_block_timer =
- ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime",
1);
- _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteDiskTime", 1);
- _spill_data_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize",
TUnit::BYTES, 1);
- _spill_block_count =
- ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount",
TUnit::UNIT, 1);
- _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillWaitInQueueTime", 1);
return Status::OK();
}
@@ -86,11 +79,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
auto query_id = state->query_id();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- auto spill_func = [this, state, query_id, &parent, submit_timer] {
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ auto spill_func = [this, state, query_id, &parent] {
SCOPED_TIMER(_spill_merge_sort_timer);
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
@@ -136,15 +125,11 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
_shared_state->spill_block_batch_row_count,
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES,
profile());
RETURN_IF_ERROR(_status);
- _status = tmp_stream->prepare_spill();
- RETURN_IF_ERROR(_status);
+ tmp_stream->set_write_counters(profile());
_shared_state->sorted_streams.emplace_back(tmp_stream);
bool eos = false;
- tmp_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer,
- _spill_write_wait_io_timer,
memory_used_counter());
while (!eos && !state->is_cancelled()) {
merge_sorted_block.clear_column_data();
{
@@ -178,7 +163,19 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
return Status::OK();
};
- auto exception_catch_func = [this, spill_func]() {
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto exception_catch_func = [this, spill_func, submit_timer]() {
+ auto submit_elapsed_time = submit_timer.elapsed_time();
+ _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+ exec_time_counter()->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_spill_total_timer);
+ SCOPED_TIMER(_spill_recover_time);
+
_status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
}();
};
@@ -188,7 +185,8 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
"merge_sort_spill_data submit_func failed");
});
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ std::make_shared<SpillRunnable>(state, _runtime_profile.get(),
false,
+ _shared_state->shared_from_this(),
exception_catch_func));
}
@@ -203,8 +201,7 @@ Status SpillSortLocalState::_create_intermediate_merger(
_current_merging_streams.clear();
for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty();
++i) {
auto stream = _shared_state->sorted_streams.front();
- stream->set_read_counters(Base::_spill_read_data_time,
Base::_spill_deserialize_time,
- Base::_spill_read_bytes,
Base::_spill_read_wait_io_timer);
+ stream->set_read_counters(profile());
_current_merging_streams.emplace_back(stream);
child_block_suppliers.emplace_back(
std::bind(std::mem_fn(&vectorized::SpillStream::read_next_block_sync),
stream.get(),
@@ -263,6 +260,7 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state)
{
Status SpillSortSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
+ local_state.copy_shared_spill_profile();
Defer defer {[&]() {
if (!local_state._status.ok() || *eos) {
local_state._shared_state->close();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index ca984e352fc..7536dd15e92 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -65,10 +65,6 @@ protected:
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
// counters for spill merge sort
RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
- RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
- RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
- RuntimeProfile::Counter* _spill_data_size = nullptr;
- RuntimeProfile::Counter* _spill_block_count = nullptr;
};
class SortSourceOperatorX;
class SpillSortSourceOperatorX : public OperatorX<SpillSortLocalState> {
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index d2b157463ae..2ea5cedcdb0 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -29,6 +29,7 @@
#include "runtime/runtime_state.h"
#include "runtime/task_execution_context.h"
#include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
#include "util/threadpool.h"
#include "vec/runtime/partitioner.h"
@@ -78,12 +79,23 @@ private:
class SpillRunnable : public Runnable {
public:
- SpillRunnable(RuntimeState* state, const
std::shared_ptr<BasicSharedState>& shared_state,
- std::function<void()> func)
+ SpillRunnable(RuntimeState* state, RuntimeProfile* profile, bool is_write,
+ const std::shared_ptr<BasicSharedState>& shared_state,
std::function<void()> func)
: _state(state),
+ _is_write(is_write),
_task_context_holder(state->get_task_execution_context()),
_shared_state_holder(shared_state),
- _func(std::move(func)) {}
+ _func(std::move(func)) {
+ write_wait_in_queue_task_count =
profile->get_counter("SpillWriteTaskWaitInQueueCount");
+ writing_task_count = profile->get_counter("SpillWriteTaskCount");
+ read_wait_in_queue_task_count =
profile->get_counter("SpillReadTaskWaitInQueueCount");
+ reading_task_count = profile->get_counter("SpillReadTaskCount");
+ if (is_write) {
+ COUNTER_UPDATE(write_wait_in_queue_task_count, 1);
+ } else {
+ COUNTER_UPDATE(read_wait_in_queue_task_count, 1);
+ }
+ }
~SpillRunnable() override = default;
@@ -94,8 +106,20 @@ public:
if (!task_context_holder) {
return;
}
+ if (_is_write) {
+ COUNTER_UPDATE(write_wait_in_queue_task_count, -1);
+ COUNTER_UPDATE(writing_task_count, 1);
+ } else {
+ COUNTER_UPDATE(read_wait_in_queue_task_count, -1);
+ COUNTER_UPDATE(reading_task_count, 1);
+ }
SCOPED_ATTACH_TASK(_state);
Defer defer([&] {
+ if (_is_write) {
+ COUNTER_UPDATE(writing_task_count, -1);
+ } else {
+ COUNTER_UPDATE(reading_task_count, -1);
+ }
std::function<void()> tmp;
std::swap(tmp, _func);
});
@@ -113,6 +137,11 @@ public:
private:
RuntimeState* _state;
+ bool _is_write;
+ RuntimeProfile::Counter* write_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* writing_task_count = nullptr;
+ RuntimeProfile::Counter* read_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* reading_task_count = nullptr;
std::weak_ptr<TaskExecutionContext> _task_context_holder;
std::weak_ptr<BasicSharedState> _shared_state_holder;
std::function<void()> _func;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index e127565b15f..d69c43dfc46 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -95,7 +95,6 @@ Status StreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_build_table_convert_timer = ADD_TIMER(Base::profile(),
"BuildConvertToPartitionedTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
- _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
_expr_timer = ADD_TIMER(Base::profile(), "ExprTime");
_serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index f1804601693..5ed3509ae78 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -548,9 +548,11 @@ public:
[[nodiscard]] Status merge_impl_ignore_overflow(T&& block) {
if (_columns.size() != block.columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "Merge block not match, self:[columns: {}, types: {}],
input:[columns: {}, "
+ "Merge block not match, self column count: {}, [columns:
{}, types: {}], "
+ "input column count: {}, [columns: {}, "
"types: {}], ",
- dump_names(), dump_types(), block.dump_names(),
block.dump_types());
+ _columns.size(), dump_names(), dump_types(),
block.columns(),
+ block.dump_names(), block.dump_types());
}
for (int i = 0; i < _columns.size(); ++i) {
DCHECK(_data_types[i]->equals(*block.get_by_position(i).type))
@@ -583,9 +585,11 @@ public:
} else {
if (_columns.size() != block.columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "Merge block not match, self:[columns: {}, types: {}],
input:[columns: {}, "
+ "Merge block not match, self column count: {},
[columns: {}, types: {}], "
+ "input column count: {}, [columns: {}, "
"types: {}], ",
- dump_names(), dump_types(), block.dump_names(),
block.dump_types());
+ _columns.size(), dump_names(), dump_types(),
block.columns(),
+ block.dump_names(), block.dump_types());
}
for (int i = 0; i < _columns.size(); ++i) {
if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
diff --git a/be/src/vec/spill/spill_reader.cpp
b/be/src/vec/spill/spill_reader.cpp
index f0320ee9b49..d0258344541 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -37,7 +37,9 @@ Status SpillReader::open() {
return Status::OK();
}
- SCOPED_TIMER(read_timer_);
+ SCOPED_TIMER(_read_file_timer);
+
+ COUNTER_UPDATE(_read_file_count, 1);
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_path_,
&file_reader_));
@@ -50,12 +52,14 @@ Status SpillReader::open() {
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result,
&bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
+ COUNTER_UPDATE(_read_file_size, bytes_read);
// read max sub block size
bytes_read = 0;
result.data = (char*)&max_sub_block_size_;
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2,
result, &bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
+ COUNTER_UPDATE(_read_file_size, bytes_read);
size_t buff_size = std::max(block_count_ * sizeof(size_t),
max_sub_block_size_);
try {
@@ -73,6 +77,7 @@ Status SpillReader::open() {
RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
DCHECK(bytes_read == block_count_ * sizeof(size_t));
+ COUNTER_UPDATE(_read_file_size, bytes_read);
block_start_offsets_.resize(block_count_ + 1);
for (size_t i = 0; i < block_count_; ++i) {
@@ -103,21 +108,24 @@ Status SpillReader::read(Block* block, bool* eos) {
Slice result(read_buff_.get(), bytes_to_read);
size_t bytes_read = 0;
{
- SCOPED_TIMER(read_timer_);
+ SCOPED_TIMER(_read_file_timer);
RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_],
result,
&bytes_read));
}
DCHECK(bytes_read == bytes_to_read);
- COUNTER_UPDATE(read_bytes_, bytes_read);
if (bytes_read > 0) {
+ COUNTER_UPDATE(_read_file_size, bytes_read);
+ COUNTER_UPDATE(_read_block_count, 1);
{
- SCOPED_TIMER(deserialize_timer_);
+ SCOPED_TIMER(_deserialize_timer);
if (!pb_block_.ParseFromArray(result.data, result.size)) {
return Status::InternalError("Failed to read spilled block");
}
RETURN_IF_ERROR(block->deserialize(pb_block_));
}
+ COUNTER_UPDATE(_read_block_data_size, block->bytes());
+ COUNTER_UPDATE(_read_rows_count, block->rows());
} else {
block->clear_column_data();
}
diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h
index 6694bf91572..fcb7d8d9e0b 100644
--- a/be/src/vec/spill/spill_reader.h
+++ b/be/src/vec/spill/spill_reader.h
@@ -50,12 +50,14 @@ public:
size_t block_count() const { return block_count_; }
- void set_counters(RuntimeProfile::Counter* read_timer,
- RuntimeProfile::Counter* deserialize_timer,
- RuntimeProfile::Counter* read_bytes) {
- read_timer_ = read_timer;
- deserialize_timer_ = deserialize_timer;
- read_bytes_ = read_bytes;
+ void set_counters(RuntimeProfile* profile) {
+ _read_file_timer = profile->get_counter("SpillReadFileTime");
+ _deserialize_timer =
profile->get_counter("SpillReadDerializeBlockTime");
+ _read_block_count = profile->get_counter("SpillReadBlockCount");
+ _read_block_data_size = profile->get_counter("SpillReadBlockDataSize");
+ _read_file_size = profile->get_counter("SpillReadFileSize");
+ _read_rows_count = profile->get_counter("SpillReadRows");
+ _read_file_count = profile->get_counter("SpillReadFileCount");
}
private:
@@ -71,9 +73,13 @@ private:
PBlock pb_block_;
- RuntimeProfile::Counter* read_timer_;
- RuntimeProfile::Counter* deserialize_timer_;
- RuntimeProfile::Counter* read_bytes_;
+ RuntimeProfile::Counter* _read_file_timer = nullptr;
+ RuntimeProfile::Counter* _deserialize_timer = nullptr;
+ RuntimeProfile::Counter* _read_block_count = nullptr;
+ RuntimeProfile::Counter* _read_block_data_size = nullptr;
+ RuntimeProfile::Counter* _read_file_size = nullptr;
+ RuntimeProfile::Counter* _read_rows_count = nullptr;
+ RuntimeProfile::Counter* _read_file_count = nullptr;
};
using SpillReaderUPtr = std::unique_ptr<SpillReader>;
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index 7189fad262c..6f9143b8073 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -44,16 +44,31 @@ SpillStream::SpillStream(RuntimeState* state, int64_t
stream_id, SpillDataDir* d
batch_rows_(batch_rows),
batch_bytes_(batch_bytes),
query_id_(state->query_id()),
- profile_(profile) {}
+ profile_(profile) {
+ _total_file_count = profile_->get_counter("SpillWriteFileTotalCount");
+ _current_file_count = profile_->get_counter("SpillWriteFileCurrentCount");
+ _current_file_size = profile_->get_counter("SpillWriteFileCurrentSize");
+}
+
+void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) {
+ _current_file_count =
source_op_profile->get_counter("SpillWriteFileCurrentCount");
+ _current_file_size =
source_op_profile->get_counter("SpillWriteFileCurrentSize");
+}
SpillStream::~SpillStream() {
gc();
}
void SpillStream::gc() {
+ if (_current_file_size) {
+ COUNTER_UPDATE(_current_file_size, -total_written_bytes_);
+ }
bool exists = false;
auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
if (status.ok() && exists) {
+ if (_current_file_count) {
+ COUNTER_UPDATE(_current_file_count, -1);
+ }
auto query_gc_dir =
data_dir_->get_spill_data_gc_path(print_id(query_id_));
status = io::global_local_filesystem()->create_directory(query_gc_dir);
DBUG_EXECUTE_IF("fault_inject::spill_stream::gc", {
@@ -79,10 +94,19 @@ void SpillStream::gc() {
}
Status SpillStream::prepare() {
- writer_ = std::make_unique<SpillWriter>(stream_id_, batch_rows_,
data_dir_, spill_dir_);
+ writer_ =
+ std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_,
data_dir_, spill_dir_);
reader_ = std::make_unique<SpillReader>(stream_id_,
writer_->get_file_path());
- return Status::OK();
+
+ DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
+ return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream
prepare_spill failed");
+ });
+ COUNTER_UPDATE(_total_file_count, 1);
+ if (_current_file_count) {
+ COUNTER_UPDATE(_current_file_count, 1);
+ }
+ return writer_->open();
}
const TUniqueId& SpillStream::query_id() const {
@@ -92,12 +116,6 @@ const TUniqueId& SpillStream::query_id() const {
const std::string& SpillStream::get_spill_root_dir() const {
return data_dir_->path();
}
-Status SpillStream::prepare_spill() {
- DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
- return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream
prepare_spill failed");
- });
- return writer_->open();
-}
Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool
eof) {
size_t written_bytes = 0;
@@ -106,9 +124,7 @@ Status SpillStream::spill_block(RuntimeState* state, const
Block& block, bool eo
});
RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
if (eof) {
- RETURN_IF_ERROR(writer_->close());
- total_written_bytes_ = writer_->get_written_bytes();
- writer_.reset();
+ RETURN_IF_ERROR(spill_eof());
} else {
total_written_bytes_ = writer_->get_written_bytes();
}
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 26b7dcbaf06..5be151be72c 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -56,32 +56,17 @@ public:
int64_t get_written_bytes() const { return total_written_bytes_; }
- Status prepare_spill();
-
Status spill_block(RuntimeState* state, const Block& block, bool eof);
Status spill_eof();
Status read_next_block_sync(Block* block, bool* eos);
- void set_write_counters(RuntimeProfile::Counter* serialize_timer,
- RuntimeProfile::Counter* write_block_counter,
- RuntimeProfile::Counter* write_bytes_counter,
- RuntimeProfile::Counter* write_timer,
- RuntimeProfile::Counter* wait_io_timer,
- RuntimeProfile::Counter* memory_used_counter) {
- writer_->set_counters(serialize_timer, write_block_counter,
write_bytes_counter,
- write_timer, memory_used_counter);
- write_wait_io_timer_ = wait_io_timer;
- }
-
- void set_read_counters(RuntimeProfile::Counter* read_timer,
- RuntimeProfile::Counter* deserialize_timer,
- RuntimeProfile::Counter* read_bytes,
- RuntimeProfile::Counter* wait_io_timer) {
- reader_->set_counters(read_timer, deserialize_timer, read_bytes);
- read_wait_io_timer_ = wait_io_timer;
- }
+ void set_write_counters(RuntimeProfile* profile) {
writer_->set_counters(profile); }
+
+ void set_read_counters(RuntimeProfile* profile) {
reader_->set_counters(profile); }
+
+ void update_shared_profiles(RuntimeProfile* source_op_profile);
const TUniqueId& query_id() const;
@@ -93,6 +78,8 @@ private:
RuntimeState* state_ = nullptr;
int64_t stream_id_;
SpillDataDir* data_dir_ = nullptr;
+ // Directory path format specified in
SpillStreamManager::register_spill_stream:
+ //
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id
std::string spill_dir_;
size_t batch_rows_;
size_t batch_bytes_;
@@ -106,8 +93,9 @@ private:
TUniqueId query_id_;
RuntimeProfile* profile_ = nullptr;
- RuntimeProfile::Counter* write_wait_io_timer_ = nullptr;
- RuntimeProfile::Counter* read_wait_io_timer_ = nullptr;
+ RuntimeProfile::Counter* _current_file_count = nullptr;
+ RuntimeProfile::Counter* _total_file_count = nullptr;
+ RuntimeProfile::Counter* _current_file_size = nullptr;
};
using SpillStreamSPtr = std::shared_ptr<SpillStream>;
} // namespace vectorized
diff --git a/be/src/vec/spill/spill_stream_manager.cpp
b/be/src/vec/spill/spill_stream_manager.cpp
index 61e96559d23..c4641866dcf 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -168,6 +168,7 @@ Status
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
SpillDataDir* data_dir = nullptr;
for (auto& dir : data_dirs) {
std::string spill_root_dir = dir->get_spill_data_path();
+ //
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id
spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id,
operator_name,
node_id, state->task_id(), id);
auto st = io::global_local_filesystem()->create_directory(spill_dir);
diff --git a/be/src/vec/spill/spill_writer.cpp
b/be/src/vec/spill/spill_writer.cpp
index 9fbd81601b6..34715787756 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -45,12 +45,12 @@ Status SpillWriter::close() {
// meta: block1 offset, block2 offset, ..., blockn offset,
max_sub_block_size, n
{
- SCOPED_TIMER(write_timer_);
+ SCOPED_TIMER(_write_file_timer);
RETURN_IF_ERROR(file_writer_->append(meta_));
}
total_written_bytes_ += meta_.size();
- COUNTER_UPDATE(write_bytes_counter_, meta_.size());
+ COUNTER_UPDATE(_write_file_data_bytes_counter, meta_.size());
data_dir_->update_spill_data_usage(meta_.size());
@@ -64,6 +64,8 @@ Status SpillWriter::write(RuntimeState* state, const Block&
block, size_t& writt
written_bytes = 0;
DCHECK(file_writer_);
auto rows = block.rows();
+ COUNTER_UPDATE(_write_rows_counter, rows);
+ COUNTER_UPDATE(_write_block_bytes_counter, block.bytes());
// file format: block1, block2, ..., blockn, meta
if (rows <= batch_size_) {
return _write_internal(block, written_bytes);
@@ -84,9 +86,10 @@ Status SpillWriter::write(RuntimeState* state, const Block&
block, size_t& writt
}
});
- auto tmp_blcok_mem = tmp_block.allocated_bytes();
- memory_used_counter_->update(tmp_blcok_mem);
- Defer defer {[&]() { memory_used_counter_->update(-tmp_blcok_mem);
}};
+ int64_t tmp_blcok_mem = tmp_block.allocated_bytes();
+ COUNTER_UPDATE(_memory_used_counter, tmp_blcok_mem);
+ COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
+ Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter,
-tmp_blcok_mem); }};
RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes));
row_idx += block_rows;
@@ -100,26 +103,31 @@ Status SpillWriter::_write_internal(const Block& block,
size_t& written_bytes) {
Status status;
std::string buff;
+ int64_t buff_size {0};
if (block.rows() > 0) {
{
PBlock pblock;
- SCOPED_TIMER(serialize_timer_);
+ SCOPED_TIMER(_serialize_timer);
status = block.serialize(
BeExecVersionManager::get_newest_version(), &pblock,
&uncompressed_bytes,
&compressed_bytes,
segment_v2::CompressionTypePB::ZSTD); // ZSTD for better
compression ratio
RETURN_IF_ERROR(status);
- auto pblock_mem = pblock.ByteSizeLong();
- memory_used_counter_->update(pblock_mem);
- Defer defer {[&]() { memory_used_counter_->update(-pblock_mem); }};
+ int64_t pblock_mem = pblock.ByteSizeLong();
+ COUNTER_UPDATE(_memory_used_counter, pblock_mem);
+ COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
+ Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter,
-pblock_mem); }};
if (!pblock.SerializeToString(&buff)) {
return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
"serialize spill data error. [path={}]", file_path_);
}
- memory_used_counter_->update(buff.size());
+ buff_size = buff.size();
+ COUNTER_UPDATE(_memory_used_counter, buff_size);
+ COUNTER_SET(_peak_memory_usage_counter,
_memory_used_counter->value());
+ Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter,
-buff_size); }};
}
- if (data_dir_->reach_capacity_limit(buff.size())) {
+ if (data_dir_->reach_capacity_limit(buff_size)) {
return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>(
"spill data total size exceed limit, path: {}, size limit:
{}, spill data "
"size: {}",
@@ -129,31 +137,29 @@ Status SpillWriter::_write_internal(const Block& block,
size_t& written_bytes) {
}
{
- auto buff_size = buff.size();
Defer defer {[&]() {
- memory_used_counter_->update(-buff_size);
if (status.ok()) {
data_dir_->update_spill_data_usage(buff_size);
written_bytes += buff_size;
- max_sub_block_size_ = std::max(max_sub_block_size_,
buff_size);
+ max_sub_block_size_ = std::max(max_sub_block_size_,
(size_t)buff_size);
meta_.append((const char*)&total_written_bytes_,
sizeof(size_t));
- COUNTER_UPDATE(write_bytes_counter_, buff_size);
- COUNTER_UPDATE(write_block_counter_, 1);
+ COUNTER_UPDATE(_write_file_data_bytes_counter, buff_size);
+ COUNTER_UPDATE(_write_block_counter, 1);
total_written_bytes_ += buff_size;
++written_blocks_;
}
}};
{
- SCOPED_TIMER(write_timer_);
+ SCOPED_TIMER(_write_file_timer);
status = file_writer_->append(buff);
RETURN_IF_ERROR(status);
}
}
}
- return Status::OK();
+ return status;
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index c3502b5d6a4..a6ea3200b14 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -31,9 +31,16 @@ namespace vectorized {
class SpillDataDir;
class SpillWriter {
public:
- SpillWriter(int64_t id, size_t batch_size, SpillDataDir* data_dir, const
std::string& dir)
+ SpillWriter(RuntimeProfile* profile, int64_t id, size_t batch_size,
SpillDataDir* data_dir,
+ const std::string& dir)
: data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) {
- file_path_ = dir + "/" + std::to_string(file_index_);
+ // Directory path format specified in
SpillStreamManager::register_spill_stream:
+ //
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id/0
+ file_path_ = dir + "/0";
+
+ _memory_used_counter = profile->get_counter("MemoryUsage");
+ _peak_memory_usage_counter =
static_cast<RuntimeProfile::HighWaterMarkCounter*>(
+ profile->get_counter("MemoryUsagePeak"));
}
Status open();
@@ -48,21 +55,16 @@ public:
const std::string& get_file_path() const { return file_path_; }
- void set_counters(RuntimeProfile::Counter* serialize_timer,
- RuntimeProfile::Counter* write_block_counter,
- RuntimeProfile::Counter* write_bytes_counter,
- RuntimeProfile::Counter* write_timer,
- RuntimeProfile::Counter* memory_used_counter) {
- serialize_timer_ = serialize_timer;
- write_block_counter_ = write_block_counter;
- write_bytes_counter_ = write_bytes_counter;
- write_timer_ = write_timer;
- memory_used_counter_ = memory_used_counter;
+ void set_counters(RuntimeProfile* profile) {
+ _write_file_timer = profile->get_counter("SpillWriteFileTime");
+ _serialize_timer =
profile->get_counter("SpillWriteSerializeBlockTime");
+ _write_block_counter = profile->get_counter("SpillWriteBlockCount");
+ _write_block_bytes_counter =
profile->get_counter("SpillWriteBlockDataSize");
+ _write_file_data_bytes_counter =
profile->get_counter("SpillWriteFileTotalSize");
+ _write_rows_counter = profile->get_counter("SpillWriteRows");
}
private:
- void _init_profile();
-
Status _write_internal(const Block& block, size_t& written_bytes);
// not owned, point to the data dir of this rowset
@@ -72,7 +74,6 @@ private:
int64_t stream_id_;
size_t batch_size_;
size_t max_sub_block_size_ = 0;
- int file_index_ = 0;
std::string file_path_;
std::unique_ptr<doris::io::FileWriter> file_writer_;
@@ -80,11 +81,14 @@ private:
int64_t total_written_bytes_ = 0;
std::string meta_;
- RuntimeProfile::Counter* write_bytes_counter_ = nullptr;
- RuntimeProfile::Counter* serialize_timer_ = nullptr;
- RuntimeProfile::Counter* write_timer_ = nullptr;
- RuntimeProfile::Counter* write_block_counter_ = nullptr;
- RuntimeProfile::Counter* memory_used_counter_ = nullptr;
+ RuntimeProfile::Counter* _write_file_timer = nullptr;
+ RuntimeProfile::Counter* _serialize_timer = nullptr;
+ RuntimeProfile::Counter* _write_block_counter = nullptr;
+ RuntimeProfile::Counter* _write_block_bytes_counter = nullptr;
+ RuntimeProfile::Counter* _write_file_data_bytes_counter = nullptr;
+ RuntimeProfile::Counter* _write_rows_counter = nullptr;
+ RuntimeProfile::Counter* _memory_used_counter = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
};
using SpillWriterUPtr = std::unique_ptr<SpillWriter>;
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]