This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab69416922 [Bug](pipelineX) fix streaming agg (#24449)
ab69416922 is described below
commit ab69416922464da09c05a7ae7044cfc6a0b1716a
Author: Gabriel <[email protected]>
AuthorDate: Fri Sep 15 19:22:54 2023 +0800
[Bug](pipelineX) fix streaming agg (#24449)
fix streaming agg
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 2 +-
.../pipeline/exec/aggregation_source_operator.cpp | 264 ++++++++++-----------
be/src/pipeline/exec/aggregation_source_operator.h | 20 +-
be/src/pipeline/exec/data_queue.cpp | 2 +-
be/src/pipeline/exec/data_queue.h | 6 +-
be/src/pipeline/exec/scan_operator.cpp | 4 +-
.../exec/streaming_aggregation_sink_operator.h | 4 +-
.../exec/streaming_aggregation_source_operator.cpp | 24 +-
.../exec/streaming_aggregation_source_operator.h | 30 +--
be/src/pipeline/pipeline_x/dependency.h | 22 +-
be/src/pipeline/pipeline_x/operator.cpp | 4 +-
be/src/pipeline/pipeline_x/operator.h | 1 -
.../ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql | 2 +-
.../ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql | 2 +-
60 files changed, 192 insertions(+), 287 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index b30bb9b815..cc0eb3be1d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -947,6 +947,6 @@ class StreamingAggSinkLocalState;
template class AggSinkOperatorX<BlockingAggSinkLocalState>;
template class AggSinkOperatorX<StreamingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, BlockingAggSinkLocalState>;
-template class AggSinkLocalState<StreamingAggDependency,
StreamingAggSinkLocalState>;
+template class AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 476683f8b8..16125c424a 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -28,8 +28,7 @@ namespace pipeline {
OPERATOR_CODE_GENERATOR(AggSourceOperator, SourceOperator)
-template <typename DependencyType, typename Derived>
-AggLocalState<DependencyType, Derived>::AggLocalState(RuntimeState* state,
OperatorXBase* parent)
+AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent),
_get_results_timer(nullptr),
_serialize_result_timer(nullptr),
@@ -38,49 +37,47 @@ AggLocalState<DependencyType,
Derived>::AggLocalState(RuntimeState* state, Opera
_serialize_data_timer(nullptr),
_hash_table_size_counter(nullptr) {}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::init(RuntimeState* state,
LocalStateInfo& info) {
+Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(Base::profile()->total_time_counter());
- _agg_data = Base::_shared_state->agg_data.get();
+ _agg_data = _shared_state->agg_data.get();
_get_results_timer = ADD_TIMER(Base::profile(), "GetResultsTime");
_serialize_result_timer = ADD_TIMER(Base::profile(),
"SerializeResultTime");
_hash_table_iterate_timer = ADD_TIMER(Base::profile(),
"HashTableIterateTime");
_insert_keys_to_column_timer = ADD_TIMER(Base::profile(),
"InsertKeysToColumnTime");
_serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
_hash_table_size_counter = ADD_COUNTER(Base::profile(), "HashTableSize",
TUnit::UNIT);
- auto& p = Base::_parent->template cast<typename Derived::Parent>();
+ auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._without_key) {
if (p._needs_finalize) {
- _executor.get_result =
std::bind<Status>(&Derived::_get_without_key_result,
- (Derived*)this,
std::placeholders::_1,
- std::placeholders::_2,
std::placeholders::_3);
+ _executor.get_result =
std::bind<Status>(&AggLocalState::_get_without_key_result, this,
+ std::placeholders::_1,
std::placeholders::_2,
+ std::placeholders::_3);
} else {
- _executor.get_result =
std::bind<Status>(&Derived::_serialize_without_key,
- (Derived*)this,
std::placeholders::_1,
- std::placeholders::_2,
std::placeholders::_3);
+ _executor.get_result =
std::bind<Status>(&AggLocalState::_serialize_without_key, this,
+ std::placeholders::_1,
std::placeholders::_2,
+ std::placeholders::_3);
}
- _executor.close = std::bind<void>(&Derived::_close_without_key,
(Derived*)this);
+ _executor.close = std::bind<void>(&AggLocalState::_close_without_key,
this);
} else {
if (p._needs_finalize) {
- _executor.get_result =
std::bind<Status>(&Derived::_get_with_serialized_key_result,
- (Derived*)this,
std::placeholders::_1,
- std::placeholders::_2,
std::placeholders::_3);
+ _executor.get_result = std::bind<Status>(
+ &AggLocalState::_get_with_serialized_key_result, this,
std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3);
} else {
_executor.get_result = std::bind<Status>(
- &Derived::_serialize_with_serialized_key_result,
(Derived*)this,
+ &AggLocalState::_serialize_with_serialized_key_result,
this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
}
- _executor.close =
std::bind<void>(&Derived::_close_with_serialized_key, (Derived*)this);
+ _executor.close =
std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
}
_agg_data_created_without_key = p._without_key;
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-void AggLocalState<DependencyType, Derived>::_close_with_serialized_key() {
+void AggLocalState::_close_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = agg_method.data;
@@ -98,8 +95,7 @@ void AggLocalState<DependencyType,
Derived>::_close_with_serialized_key() {
Base::_dependency->release_tracker();
}
-template <typename DependencyType, typename Derived>
-void AggLocalState<DependencyType, Derived>::_close_without_key() {
+void AggLocalState::_close_without_key() {
//because prepare maybe failed, and couldn't create agg data.
//but finally call close to destory agg data, if agg data has bitmapValue
//will be core dump, it's not initialized
@@ -110,39 +106,37 @@ void AggLocalState<DependencyType,
Derived>::_close_without_key() {
Base::_dependency->release_tracker();
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_result(
- RuntimeState* state, vectorized::Block* block, SourceState&
source_state) {
- if (Base::_shared_state->spill_context.has_data) {
+Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState*
state,
+ vectorized::Block*
block,
+ SourceState&
source_state) {
+ if (_shared_state->spill_context.has_data) {
return _serialize_with_serialized_key_result_with_spilt_data(state,
block, source_state);
} else {
return _serialize_with_serialized_key_result_non_spill(state, block,
source_state);
}
}
-template <typename DependencyType, typename Derived>
-Status
-AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_result_with_spilt_data(
+Status AggLocalState::_serialize_with_serialized_key_result_with_spilt_data(
RuntimeState* state, vectorized::Block* block, SourceState&
source_state) {
- CHECK(!Base::_shared_state->spill_context.stream_ids.empty());
- CHECK(Base::_shared_state->spill_partition_helper != nullptr)
+ CHECK(!_shared_state->spill_context.stream_ids.empty());
+ CHECK(_shared_state->spill_partition_helper != nullptr)
<< "_spill_partition_helper should not be null";
- Base::_shared_state->aggregate_data_container->init_once();
- while (Base::_shared_state->aggregate_data_container->iterator ==
- Base::_shared_state->aggregate_data_container->end()) {
- if (Base::_shared_state->spill_context.read_cursor ==
- Base::_shared_state->spill_partition_helper->partition_count) {
+ _shared_state->aggregate_data_container->init_once();
+ while (_shared_state->aggregate_data_container->iterator ==
+ _shared_state->aggregate_data_container->end()) {
+ if (_shared_state->spill_context.read_cursor ==
+ _shared_state->spill_partition_helper->partition_count) {
break;
}
RETURN_IF_ERROR(Base::_dependency->reset_hash_table());
RETURN_IF_ERROR(Base::_dependency->merge_spilt_data());
- Base::_shared_state->aggregate_data_container->init_once();
+ _shared_state->aggregate_data_container->init_once();
}
RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state,
block, source_state));
if (source_state == SourceState::FINISHED) {
- source_state = Base::_shared_state->spill_context.read_cursor ==
-
Base::_shared_state->spill_partition_helper->partition_count
+ source_state = _shared_state->spill_context.read_cursor ==
+
_shared_state->spill_partition_helper->partition_count
? SourceState::FINISHED
: SourceState::DEPEND_ON_SOURCE;
}
@@ -150,12 +144,12 @@ AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_result_wi
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_result_non_spill(
- RuntimeState* state, vectorized::Block* block, SourceState&
source_state) {
+Status
AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeState*
state,
+
vectorized::Block* block,
+
SourceState& source_state) {
SCOPED_TIMER(_serialize_result_timer);
- int key_size = Base::_shared_state->probe_expr_ctxs.size();
- int agg_size = Base::_shared_state->aggregate_evaluators.size();
+ int key_size = _shared_state->probe_expr_ctxs.size();
+ int agg_size = _shared_state->aggregate_evaluators.size();
vectorized::MutableColumns value_columns(agg_size);
vectorized::DataTypes value_data_types(agg_size);
@@ -168,7 +162,7 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
} else {
key_columns.emplace_back(
-
Base::_shared_state->probe_expr_ctxs[i]->root()->data_type()->create_column());
+
_shared_state->probe_expr_ctxs[i]->root()->data_type()->create_column());
}
}
@@ -180,20 +174,20 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
const auto size = std::min(data.size(),
size_t(state->batch_size()));
using KeyType =
std::decay_t<decltype(agg_method.iterator->get_first())>;
std::vector<KeyType> keys(size);
- if (Base::_shared_state->values.size() < size + 1) {
- Base::_shared_state->values.resize(size + 1);
+ if (_shared_state->values.size() < size + 1) {
+ _shared_state->values.resize(size + 1);
}
size_t num_rows = 0;
- Base::_shared_state->aggregate_data_container->init_once();
- auto& iter =
Base::_shared_state->aggregate_data_container->iterator;
+ _shared_state->aggregate_data_container->init_once();
+ auto& iter = _shared_state->aggregate_data_container->iterator;
{
SCOPED_TIMER(_hash_table_iterate_timer);
- while (iter !=
Base::_shared_state->aggregate_data_container->end() &&
+ while (iter !=
_shared_state->aggregate_data_container->end() &&
num_rows < state->batch_size()) {
keys[num_rows] = iter.template get_key<KeyType>();
- Base::_shared_state->values[num_rows] =
iter.get_aggregate_data();
+ _shared_state->values[num_rows] =
iter.get_aggregate_data();
++iter;
++num_rows;
}
@@ -202,10 +196,10 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
{
SCOPED_TIMER(_insert_keys_to_column_timer);
agg_method.insert_keys_into_columns(keys, key_columns,
num_rows,
-
Base::_shared_state->probe_key_sz);
+
_shared_state->probe_key_sz);
}
- if (iter ==
Base::_shared_state->aggregate_data_container->end()) {
+ if (iter == _shared_state->aggregate_data_container->end()) {
if (agg_method.data.has_null_key_data()) {
// only one key of group by support wrap null key
// here need additional processing logic on the null
key / value
@@ -213,8 +207,7 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
DCHECK(key_columns[0]->is_nullable());
if (agg_method.data.has_null_key_data()) {
key_columns[0]->insert_data(nullptr, 0);
- Base::_shared_state->values[num_rows] =
- agg_method.data.get_null_key_data();
+ _shared_state->values[num_rows] =
agg_method.data.get_null_key_data();
++num_rows;
source_state = SourceState::FINISHED;
}
@@ -225,8 +218,8 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
{
SCOPED_TIMER(_serialize_data_timer);
- for (size_t i = 0; i <
Base::_shared_state->aggregate_evaluators.size(); ++i) {
- value_data_types[i] =
Base::_shared_state->aggregate_evaluators[i]
+ for (size_t i = 0; i <
_shared_state->aggregate_evaluators.size(); ++i) {
+ value_data_types[i] =
_shared_state->aggregate_evaluators[i]
->function()
->get_serialized_type();
if (mem_reuse) {
@@ -234,16 +227,14 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
std::move(*block->get_by_position(i +
key_size).column)
.mutate();
} else {
- value_columns[i] =
Base::_shared_state->aggregate_evaluators[i]
+ value_columns[i] =
_shared_state->aggregate_evaluators[i]
->function()
->create_serialize_column();
}
- Base::_shared_state->aggregate_evaluators[i]
- ->function()
- ->serialize_to_column(
- Base::_shared_state->values,
-
Base::_dependency->offsets_of_aggregate_states()[i],
- value_columns[i], num_rows);
+
_shared_state->aggregate_evaluators[i]->function()->serialize_to_column(
+ _shared_state->values,
+
Base::_dependency->offsets_of_aggregate_states()[i],
+ value_columns[i], num_rows);
}
}
},
@@ -254,8 +245,8 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
for (int i = 0; i < key_size; ++i) {
columns_with_schema.emplace_back(
std::move(key_columns[i]),
-
Base::_shared_state->probe_expr_ctxs[i]->root()->data_type(),
-
Base::_shared_state->probe_expr_ctxs[i]->root()->expr_name());
+ _shared_state->probe_expr_ctxs[i]->root()->data_type(),
+ _shared_state->probe_expr_ctxs[i]->root()->expr_name());
}
for (int i = 0; i < agg_size; ++i) {
columns_with_schema.emplace_back(std::move(value_columns[i]),
value_data_types[i], "");
@@ -266,38 +257,36 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_with_serialized_key_re
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::_get_with_serialized_key_result(
- RuntimeState* state, vectorized::Block* block, SourceState&
source_state) {
- if (Base::_shared_state->spill_context.has_data) {
+Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state,
vectorized::Block* block,
+ SourceState&
source_state) {
+ if (_shared_state->spill_context.has_data) {
return _get_result_with_spilt_data(state, block, source_state);
} else {
return _get_result_with_serialized_key_non_spill(state, block,
source_state);
}
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::_get_result_with_spilt_data(
- RuntimeState* state, vectorized::Block* block, SourceState&
source_state) {
- CHECK(!Base::_shared_state->spill_context.stream_ids.empty());
- CHECK(Base::_shared_state->spill_partition_helper != nullptr)
+Status AggLocalState::_get_result_with_spilt_data(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
+ CHECK(!_shared_state->spill_context.stream_ids.empty());
+ CHECK(_shared_state->spill_partition_helper != nullptr)
<< "_spill_partition_helper should not be null";
- Base::_shared_state->aggregate_data_container->init_once();
- while (Base::_shared_state->aggregate_data_container->iterator ==
- Base::_shared_state->aggregate_data_container->end()) {
- if (Base::_shared_state->spill_context.read_cursor ==
- Base::_shared_state->spill_partition_helper->partition_count) {
+ _shared_state->aggregate_data_container->init_once();
+ while (_shared_state->aggregate_data_container->iterator ==
+ _shared_state->aggregate_data_container->end()) {
+ if (_shared_state->spill_context.read_cursor ==
+ _shared_state->spill_partition_helper->partition_count) {
break;
}
RETURN_IF_ERROR(Base::_dependency->reset_hash_table());
RETURN_IF_ERROR(Base::_dependency->merge_spilt_data());
- Base::_shared_state->aggregate_data_container->init_once();
+ _shared_state->aggregate_data_container->init_once();
}
RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block,
source_state));
if (source_state == SourceState::FINISHED) {
- source_state = Base::_shared_state->spill_context.read_cursor ==
-
Base::_shared_state->spill_partition_helper->partition_count
+ source_state = _shared_state->spill_context.read_cursor ==
+
_shared_state->spill_partition_helper->partition_count
? SourceState::FINISHED
: SourceState::DEPEND_ON_SOURCE;
}
@@ -305,15 +294,15 @@ Status AggLocalState<DependencyType,
Derived>::_get_result_with_spilt_data(
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType,
Derived>::_get_result_with_serialized_key_non_spill(
- RuntimeState* state, vectorized::Block* block, SourceState&
source_state) {
+Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState*
state,
+
vectorized::Block* block,
+ SourceState&
source_state) {
// non-nullable column(id in `_make_nullable_keys`) will be converted to
nullable.
bool mem_reuse = Base::_dependency->make_nullable_keys().empty() &&
block->mem_reuse();
auto columns_with_schema =
vectorized::VectorizedUtils::create_columns_with_type_and_name(
- Base::_parent->template cast<typename
Derived::Parent>()._row_descriptor);
- int key_size = Base::_shared_state->probe_expr_ctxs.size();
+ _parent->cast<AggSourceOperatorX>()._row_descriptor);
+ int key_size = _shared_state->probe_expr_ctxs.size();
vectorized::MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
@@ -340,20 +329,20 @@ Status AggLocalState<DependencyType,
Derived>::_get_result_with_serialized_key_n
const auto size = std::min(data.size(),
size_t(state->batch_size()));
using KeyType =
std::decay_t<decltype(agg_method.iterator->get_first())>;
std::vector<KeyType> keys(size);
- if (Base::_shared_state->values.size() < size) {
- Base::_shared_state->values.resize(size);
+ if (_shared_state->values.size() < size) {
+ _shared_state->values.resize(size);
}
size_t num_rows = 0;
- Base::_shared_state->aggregate_data_container->init_once();
- auto& iter =
Base::_shared_state->aggregate_data_container->iterator;
+ _shared_state->aggregate_data_container->init_once();
+ auto& iter = _shared_state->aggregate_data_container->iterator;
{
SCOPED_TIMER(_hash_table_iterate_timer);
- while (iter !=
Base::_shared_state->aggregate_data_container->end() &&
+ while (iter !=
_shared_state->aggregate_data_container->end() &&
num_rows < state->batch_size()) {
keys[num_rows] = iter.template get_key<KeyType>();
- Base::_shared_state->values[num_rows] =
iter.get_aggregate_data();
+ _shared_state->values[num_rows] =
iter.get_aggregate_data();
++iter;
++num_rows;
}
@@ -362,17 +351,17 @@ Status AggLocalState<DependencyType,
Derived>::_get_result_with_serialized_key_n
{
SCOPED_TIMER(_insert_keys_to_column_timer);
agg_method.insert_keys_into_columns(keys, key_columns,
num_rows,
-
Base::_shared_state->probe_key_sz);
+
_shared_state->probe_key_sz);
}
- for (size_t i = 0; i <
Base::_shared_state->aggregate_evaluators.size(); ++i) {
-
Base::_shared_state->aggregate_evaluators[i]->insert_result_info_vec(
- Base::_shared_state->values,
+ for (size_t i = 0; i <
_shared_state->aggregate_evaluators.size(); ++i) {
+
_shared_state->aggregate_evaluators[i]->insert_result_info_vec(
+ _shared_state->values,
Base::_dependency->offsets_of_aggregate_states()[i],
value_columns[i].get(), num_rows);
}
- if (iter ==
Base::_shared_state->aggregate_data_container->end()) {
+ if (iter == _shared_state->aggregate_data_container->end()) {
if (agg_method.data.has_null_key_data()) {
// only one key of group by support wrap null key
// here need additional processing logic on the null
key / value
@@ -381,9 +370,8 @@ Status AggLocalState<DependencyType,
Derived>::_get_result_with_serialized_key_n
if (key_columns[0]->size() < state->batch_size()) {
key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.data.get_null_key_data();
- for (size_t i = 0; i <
Base::_shared_state->aggregate_evaluators.size();
- ++i)
-
Base::_shared_state->aggregate_evaluators[i]->insert_result_info(
+ for (size_t i = 0; i <
_shared_state->aggregate_evaluators.size(); ++i)
+
_shared_state->aggregate_evaluators[i]->insert_result_info(
mapped +
Base::_dependency->offsets_of_aggregate_states()[i],
value_columns[i].get());
@@ -412,42 +400,39 @@ Status AggLocalState<DependencyType,
Derived>::_get_result_with_serialized_key_n
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType,
Derived>::_serialize_without_key(RuntimeState* state,
-
vectorized::Block* block,
-
SourceState& source_state) {
+Status AggLocalState::_serialize_without_key(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
// 1. `child(0)->rows_returned() == 0` mean not data from child
// in level two aggregation node should return NULL result
// level one aggregation node set `eos = true` return directly
SCOPED_TIMER(_serialize_result_timer);
- if (UNLIKELY(Base::_shared_state->input_num_rows == 0)) {
+ if (UNLIKELY(_shared_state->input_num_rows == 0)) {
source_state = SourceState::FINISHED;
return Status::OK();
}
block->clear();
DCHECK(_agg_data->without_key != nullptr);
- int agg_size = Base::_shared_state->aggregate_evaluators.size();
+ int agg_size = _shared_state->aggregate_evaluators.size();
vectorized::MutableColumns value_columns(agg_size);
std::vector<vectorized::DataTypePtr> data_types(agg_size);
// will serialize data to string column
- for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i)
{
- data_types[i] =
-
Base::_shared_state->aggregate_evaluators[i]->function()->get_serialized_type();
+ for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+ data_types[i] =
_shared_state->aggregate_evaluators[i]->function()->get_serialized_type();
value_columns[i] =
-
Base::_shared_state->aggregate_evaluators[i]->function()->create_serialize_column();
+
_shared_state->aggregate_evaluators[i]->function()->create_serialize_column();
}
- for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i)
{
-
Base::_shared_state->aggregate_evaluators[i]->function()->serialize_without_key_to_column(
+ for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+
_shared_state->aggregate_evaluators[i]->function()->serialize_without_key_to_column(
_agg_data->without_key +
Base::_dependency->offsets_of_aggregate_states()[i],
*value_columns[i]);
}
{
vectorized::ColumnsWithTypeAndName data_with_schema;
- for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size();
++i) {
+ for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
vectorized::ColumnWithTypeAndName column_with_schema = {nullptr,
data_types[i], ""};
data_with_schema.push_back(std::move(column_with_schema));
}
@@ -459,27 +444,25 @@ Status AggLocalState<DependencyType,
Derived>::_serialize_without_key(RuntimeSta
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType,
Derived>::_get_without_key_result(RuntimeState* state,
-
vectorized::Block* block,
-
SourceState& source_state) {
+Status AggLocalState::_get_without_key_result(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
DCHECK(_agg_data->without_key != nullptr);
block->clear();
- auto& p = Base::_parent->template cast<typename Derived::Parent>();
+ auto& p = _parent->cast<AggSourceOperatorX>();
*block =
vectorized::VectorizedUtils::create_empty_columnswithtypename(p._row_descriptor);
- int agg_size = Base::_shared_state->aggregate_evaluators.size();
+ int agg_size = _shared_state->aggregate_evaluators.size();
vectorized::MutableColumns columns(agg_size);
std::vector<vectorized::DataTypePtr> data_types(agg_size);
- for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i)
{
- data_types[i] =
Base::_shared_state->aggregate_evaluators[i]->function()->get_return_type();
+ for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+ data_types[i] =
_shared_state->aggregate_evaluators[i]->function()->get_return_type();
columns[i] = data_types[i]->create_column();
}
- for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i)
{
+ for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
auto column = columns[i].get();
- Base::_shared_state->aggregate_evaluators[i]->insert_result_info(
+ _shared_state->aggregate_evaluators[i]->insert_result_info(
_agg_data->without_key +
Base::_dependency->offsets_of_aggregate_states()[i],
column);
}
@@ -502,7 +485,7 @@ Status AggLocalState<DependencyType,
Derived>::_get_without_key_result(RuntimeSt
vectorized::ColumnPtr ptr = std::move(columns[i]);
// unless `count`, other aggregate function dispose empty set
should be null
// so here check the children row return
- ptr = make_nullable(ptr, Base::_shared_state->input_num_rows
== 0);
+ ptr = make_nullable(ptr, _shared_state->input_num_rows == 0);
columns[i] = ptr->assume_mutable();
}
}
@@ -521,7 +504,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state) {
- auto& local_state =
state->get_local_state(id())->cast<BlockingAggLocalState>();
+ auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
RETURN_IF_ERROR(local_state._executor.get_result(state, block,
source_state));
local_state.make_nullable_output_key(block);
@@ -531,8 +514,7 @@ Status AggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* blo
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-void AggLocalState<DependencyType,
Derived>::make_nullable_output_key(vectorized::Block* block) {
+void AggLocalState::make_nullable_output_key(vectorized::Block* block) {
if (block->rows() != 0) {
for (auto cid : Base::_dependency->make_nullable_keys()) {
block->get_by_position(cid).column =
make_nullable(block->get_by_position(cid).column);
@@ -541,13 +523,12 @@ void AggLocalState<DependencyType,
Derived>::make_nullable_output_key(vectorized
}
}
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::close(RuntimeState* state) {
+Status AggLocalState::close(RuntimeState* state) {
SCOPED_TIMER(Base::profile()->total_time_counter());
if (Base::_closed) {
return Status::OK();
}
- for (auto* aggregate_evaluator :
Base::_shared_state->aggregate_evaluators) {
+ for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) {
aggregate_evaluator->close(state);
}
if (_executor.close) {
@@ -563,26 +544,19 @@ Status AggLocalState<DependencyType,
Derived>::close(RuntimeState* state) {
_agg_data->method_variant);
}
- Base::_shared_state->agg_data = nullptr;
- Base::_shared_state->aggregate_data_container = nullptr;
- Base::_shared_state->agg_arena_pool = nullptr;
- Base::_shared_state->agg_profile_arena = nullptr;
+ _shared_state->agg_data = nullptr;
+ _shared_state->aggregate_data_container = nullptr;
+ _shared_state->agg_arena_pool = nullptr;
+ _shared_state->agg_profile_arena = nullptr;
std::vector<vectorized::AggregateDataPtr> tmp_values;
- Base::_shared_state->values.swap(tmp_values);
+ _shared_state->values.swap(tmp_values);
return Base::close(state);
}
Dependency* AggSourceOperatorX::wait_for_dependency(RuntimeState* state) {
- return state->get_local_state(Base::id())
- ->cast<BlockingAggLocalState>()
- ._dependency->read_blocked_by();
+ return
state->get_local_state(Base::id())->cast<AggLocalState>()._dependency->read_blocked_by();
}
-class StreamingAggLocalState;
-
-template class AggLocalState<AggDependency, BlockingAggLocalState>;
-template class AggLocalState<StreamingAggDependency, StreamingAggLocalState>;
-
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index 3148f06792..a840a72f13 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -50,10 +50,9 @@ public:
class AggSourceOperatorX;
-template <typename DependencyType, typename Derived>
-class AggLocalState : public PipelineXLocalState<DependencyType> {
+class AggLocalState : public PipelineXLocalState<AggDependency> {
public:
- using Base = PipelineXLocalState<DependencyType>;
+ using Base = PipelineXLocalState<AggDependency>;
ENABLE_FACTORY_CREATOR(AggLocalState);
AggLocalState(RuntimeState* state, OperatorXBase* parent);
@@ -110,19 +109,9 @@ protected:
bool _agg_data_created_without_key = false;
};
-class BlockingAggLocalState final : public AggLocalState<AggDependency,
BlockingAggLocalState> {
+class AggSourceOperatorX : public OperatorX<AggLocalState> {
public:
- ENABLE_FACTORY_CREATOR(BlockingAggLocalState);
- using Parent = AggSourceOperatorX;
-
- BlockingAggLocalState(RuntimeState* state, OperatorXBase* parent)
- : AggLocalState<AggDependency, BlockingAggLocalState>(state,
parent) {}
- ~BlockingAggLocalState() = default;
-};
-
-class AggSourceOperatorX final : public OperatorX<BlockingAggLocalState> {
-public:
- using Base = OperatorX<BlockingAggLocalState>;
+ using Base = OperatorX<AggLocalState>;
AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
~AggSourceOperatorX() = default;
Dependency* wait_for_dependency(RuntimeState* state) override;
@@ -133,7 +122,6 @@ public:
bool is_source() const override { return true; }
private:
- template <typename DependencyType, typename Derived>
friend class AggLocalState;
bool _needs_finalize;
diff --git a/be/src/pipeline/exec/data_queue.cpp
b/be/src/pipeline/exec/data_queue.cpp
index eff91bdd3f..e070fe2c99 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -30,7 +30,7 @@
namespace doris {
namespace pipeline {
-DataQueue::DataQueue(int child_count, StreamingAggDependency* agg_dependency,
+DataQueue::DataQueue(int child_count, AggDependency* agg_dependency,
UnionDependency* union_dependency)
: _queue_blocks_lock(child_count),
_queue_blocks(child_count),
diff --git a/be/src/pipeline/exec/data_queue.h
b/be/src/pipeline/exec/data_queue.h
index e6105a3264..ab65bfeea6 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -30,13 +30,13 @@
namespace doris {
namespace pipeline {
-class StreamingAggDependency;
+class AggDependency;
class UnionDependency;
class DataQueue {
public:
//always one is enough, but in union node it's has more children
- DataQueue(int child_count = 1, StreamingAggDependency* agg_dependency =
nullptr,
+ DataQueue(int child_count = 1, AggDependency* agg_dependency = nullptr,
UnionDependency* union_dependency = nullptr);
~DataQueue() = default;
@@ -88,7 +88,7 @@ private:
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
- StreamingAggDependency* _agg_dependency = nullptr;
+ AggDependency* _agg_dependency = nullptr;
UnionDependency* _union_dependency = nullptr;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index cb4675bcfb..544bc3bc67 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -151,8 +151,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
_open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
_alloc_resource_timer = ADD_TIMER(_runtime_profile,
"AllocateResourceTime");
- static const std::string timer_name =
- "WaitForDependency[" + _source_dependency->name() + "]Time";
+ static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
_wait_for_data_timer = ADD_CHILD_TIMER(_runtime_profile, "WaitForData",
timer_name);
_wait_for_scanner_done_timer =
@@ -164,6 +163,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
template <typename Derived>
Status ScanLocalState<Derived>::open(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
if (_open_dependency == nullptr) {
return Status::OK();
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 5b7f3073ff..5b9c635580 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -74,10 +74,10 @@ private:
class StreamingAggSinkOperatorX;
class StreamingAggSinkLocalState final
- : public AggSinkLocalState<StreamingAggDependency,
StreamingAggSinkLocalState> {
+ : public AggSinkLocalState<AggDependency, StreamingAggSinkLocalState> {
public:
using Parent = StreamingAggSinkOperatorX;
- using Base = AggSinkLocalState<StreamingAggDependency,
StreamingAggSinkLocalState>;
+ using Base = AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;
ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState);
StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState*
state);
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index 7b1544232d..a0a866d0f7 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -75,31 +75,27 @@ OperatorPtr
StreamingAggSourceOperatorBuilder::build_operator() {
StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl&
descs)
- : Base(pool, tnode, descs),
- _needs_finalize(tnode.agg_node.need_finalize),
- _without_key(tnode.agg_node.grouping_exprs.empty()) {}
+ : Base(pool, tnode, descs) {}
Status StreamingAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
- auto& local_state =
state->get_local_state(id())->cast<StreamingAggLocalState>();
+ auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (!local_state._shared_state->data_queue->data_exhausted()) {
std::unique_ptr<vectorized::Block> agg_block;
DCHECK(local_state._dependency->read_blocked_by() == nullptr);
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
- if (!local_state._shared_state->data_queue->data_exhausted()) {
+ if (local_state._shared_state->data_queue->data_exhausted()) {
+ RETURN_IF_ERROR(Base::get_block(state, block, source_state));
+ } else {
block->swap(*agg_block);
agg_block->clear_column_data(row_desc().num_materialized_slots());
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
- return Status::OK();
}
+ } else {
+ RETURN_IF_ERROR(Base::get_block(state, block, source_state));
}
- RETURN_IF_ERROR(local_state._executor.get_result(state, block,
source_state));
- local_state.make_nullable_output_key(block);
- // dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
- local_state.reached_limit(block, source_state);
return Status::OK();
}
@@ -110,11 +106,5 @@ Status StreamingAggSourceOperatorX::init(const TPlanNode&
tnode, RuntimeState* s
return Status::OK();
}
-Dependency* StreamingAggSourceOperatorX::wait_for_dependency(RuntimeState*
state) {
- return state->get_local_state(id())
- ->cast<StreamingAggLocalState>()
- ._dependency->read_blocked_by();
-}
-
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index 41f99164d3..10dc6cd026 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -60,43 +60,17 @@ private:
std::shared_ptr<DataQueue> _data_queue;
};
-class StreamingAggSourceOperatorX;
-
-class StreamingAggLocalState final
- : public AggLocalState<StreamingAggDependency, StreamingAggLocalState>
{
+class StreamingAggSourceOperatorX final : public AggSourceOperatorX {
public:
- using Parent = StreamingAggSourceOperatorX;
- ENABLE_FACTORY_CREATOR(StreamingAggLocalState);
- StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent)
- : AggLocalState<StreamingAggDependency,
StreamingAggLocalState>(state, parent) {}
- ~StreamingAggLocalState() = default;
-};
-
-class StreamingAggSourceOperatorX final : public
OperatorX<StreamingAggLocalState> {
-public:
- using Base = OperatorX<StreamingAggLocalState>;
+ using Base = AggSourceOperatorX;
StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
~StreamingAggSourceOperatorX() = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Dependency* wait_for_dependency(RuntimeState* state) override;
-
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
-
- bool is_source() const override { return true; }
-
-private:
- template <typename DependencyType, typename Derived>
- friend class AggLocalState;
-
- bool _needs_finalize;
- bool _without_key;
- // left / full join will change the key nullable make output/input solt
- // nullable diff. so we need make nullable of it.
- std::vector<size_t> _make_nullable_keys;
};
} // namespace pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index cb9c8a5185..e4caf1ebdf 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -254,6 +254,7 @@ public:
size_t input_num_rows = 0;
std::vector<vectorized::AggregateDataPtr> values;
std::unique_ptr<vectorized::Arena> agg_profile_arena;
+ std::unique_ptr<DataQueue> data_queue;
};
class AggDependency : public Dependency {
@@ -261,6 +262,7 @@ public:
using SharedState = AggSharedState;
AggDependency(int id) : Dependency(id, "AggDependency") {
_mem_tracker = std::make_unique<MemTracker>("AggregateOperator:");
+ _agg_state.data_queue = std::make_unique<DataQueue>(1, this);
}
~AggDependency() override = default;
@@ -317,26 +319,6 @@ private:
AggSharedState _agg_state;
};
-struct StreamingAggSharedState final : public AggSharedState {
-public:
- StreamingAggSharedState() : AggSharedState() {}
- ~StreamingAggSharedState() = default;
- std::unique_ptr<DataQueue> data_queue;
-};
-
-class StreamingAggDependency final : public AggDependency {
-public:
- using SharedState = StreamingAggSharedState;
- StreamingAggDependency(int id) : AggDependency(id) {
- _streaming_agg_state.data_queue = std::make_unique<DataQueue>(1, this);
- }
-
- void* shared_state() override { return (void*)&_streaming_agg_state; }
-
-private:
- StreamingAggSharedState _streaming_agg_state;
-};
-
struct SortSharedState {
public:
std::unique_ptr<vectorized::Sorter> sorter;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index b35d15cb7f..3452588bad 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -337,8 +337,7 @@ DECLARE_OPERATOR_X(HashJoinProbeLocalState)
DECLARE_OPERATOR_X(OlapScanLocalState)
DECLARE_OPERATOR_X(AnalyticLocalState)
DECLARE_OPERATOR_X(SortLocalState)
-DECLARE_OPERATOR_X(BlockingAggLocalState)
-DECLARE_OPERATOR_X(StreamingAggLocalState)
+DECLARE_OPERATOR_X(AggLocalState)
DECLARE_OPERATOR_X(ExchangeLocalState)
DECLARE_OPERATOR_X(RepeatLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
@@ -368,7 +367,6 @@ template class PipelineXLocalState<SortDependency>;
template class PipelineXLocalState<NestedLoopJoinDependency>;
template class PipelineXLocalState<AnalyticDependency>;
template class PipelineXLocalState<AggDependency>;
-template class PipelineXLocalState<StreamingAggDependency>;
template class PipelineXLocalState<FakeDependency>;
template class PipelineXLocalState<UnionDependency>;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index d6adfa7458..13cf56c65d 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -313,7 +313,6 @@ public:
for (size_t i = 0; i < _projections.size(); i++) {
RETURN_IF_ERROR(_parent->_projections[i]->clone(state,
_projections[i]));
}
- DCHECK(_runtime_profile.get() != nullptr);
_rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned",
TUnit::UNIT);
_rows_input_counter = ADD_COUNTER(_runtime_profile, "InputRows",
TUnit::UNIT);
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql
index 3a9ce71313..c3d19b67a2 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT)
AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
LO_ORDERDATE >= 19930101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql
index a37876b067..6ab6ceea34 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q1.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT)
AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
LO_ORDERDATE >= 19940101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql
index 5ba5ee025c..70796c2a95 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q1.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT)
AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
weekofyear(LO_ORDERDATE) = 6
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql
index f42f86e685..57f2ada296 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.1
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql
index ae458ca5c3..9b7a5db502 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql
index 70771a50f7..3a8a5e74d4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql
index e6c31d6315..6b3257f1f3 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.1
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_NATION,
S_NATION, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql
index d9160639bd..fefe727da8 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql
index 1d6b34f9d5..c4560b701e 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql
index 2c1ce76817..4ae5d956e4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.4
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql
index 33f491eece..87b29bf160 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.1
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE
DIV 10000) AS YEAR,
C_NATION,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql
index a71d014add..8ea28f3f12 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE
DIV 10000) AS YEAR,
S_NATION,
P_CATEGORY,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql
index 3c247a188f..0f7c7401ab 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE
DIV 10000) AS YEAR,
S_CITY,
P_BRAND,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
index 260b1ee03b..50b50bc368 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
index b8b54ccaad..77c0262016 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
index fec034dd9b..0052db0aac 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
index d71685a91a..a47ec82b51 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
index c2a5f6dc15..9ab1a95d4d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
index 19331d9ee5..b7e6bd7840 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
index a99bffe752..85c470b708 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ c_nation, s_nation, d_year,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_nation,
s_nation, d_year,
SUM(lo_revenue) AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
index 65acd47881..cd0b320f87 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ c_city, s_city, d_year,
sum(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city,
s_city, d_year, sum(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
index 18fe99b85a..89765c02d9 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ c_city, s_city, d_year,
SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city,
s_city, d_year, SUM(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
index f59aac3ee6..5cef87a3fe 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ c_city, s_city, d_year,
SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city,
s_city, d_year, SUM(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
index 029934b62a..3e0227c2ea 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ d_year, c_nation,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year,
c_nation,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
index fc4bf8402e..1338e780ae 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ d_year, s_nation, p_category,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year,
s_nation, p_category,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
index 8c7c315ebc..d8e6f7c42d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ d_year, s_city, p_brand,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year,
s_city, p_brand,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
index 76d38414cd..ded6754a97 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
@@ -1,5 +1,5 @@
-- tables: lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
index 25ec865314..f102f7504d 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
@@ -1,5 +1,5 @@
-- tables: part,supplier,partsupp,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_acctbal,
s_name,
n_name,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
index eb6831b7ca..8bd60f0e07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
o_orderdate,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
index e05c649b93..3f44094729 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
@@ -1,5 +1,5 @@
-- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
o_orderpriority,
count(*) AS order_count
FROM orders
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
index b69b05e5e0..ed179f8b86 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem,supplier,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
n_name,
sum(l_extendedprice * (1 - l_discount)) AS revenue
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
index a9a080a45b..2dd86f8c2c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
@@ -1,6 +1,6 @@
-- tables: lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ sum(l_extendedprice * l_discount) AS
revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
sum(l_extendedprice * l_discount) AS revenue
FROM
lineitem
WHERE
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
index 79efbfdae4..6453c1094a 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
@@ -1,5 +1,5 @@
-- tables: supplier,lineitem,orders,customer,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
supp_nation,
cust_nation,
l_year,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
index 1faced9805..e4c46fb084 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
@@ -1,5 +1,5 @@
-- tables: part,supplier,lineitem,orders,customer,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
o_year,
sum(CASE
WHEN nation = 'BRAZIL'
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
index a47a3b5e9c..cee9925fb5 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
@@ -1,5 +1,5 @@
-- tables: part,supplier,lineitem,partsupp,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
nation,
o_year,
sum(amount) AS sum_profit
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
index 3d49252d3e..c95a80fcee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_custkey,
c_name,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
index 66140c9431..b23701e940 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
@@ -1,5 +1,5 @@
-- tables: partsupp,supplier,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
ps_partkey,
sum(ps_supplycost * ps_availqty) AS value
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
index 99c9c0f574..e8893e71e4 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
@@ -1,5 +1,5 @@
-- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_shipmode,
sum(CASE
WHEN o_orderpriority = '1-URGENT'
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
index 21a03f752c..9db2da60ee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
@@ -1,5 +1,5 @@
-- tables: customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_count,
count(*) AS custdist
FROM (
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
index b0e79f8da5..70d7a57d07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
@@ -1,5 +1,5 @@
-- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ 100.00 * sum(CASE
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 100.00 *
sum(CASE
WHEN p_type LIKE 'PROMO%'
THEN l_extendedprice * (1 - l_discount)
ELSE 0
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
index 03c0b97372..45f75ff985 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
@@ -1,4 +1,4 @@
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_suppkey,
s_name,
s_address,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
index 7df971f830..37a438c796 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
@@ -1,5 +1,5 @@
-- tables: partsupp,part,supplier
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
p_brand,
p_type,
p_size,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
index 3f9203cea0..62f39a750c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
@@ -1,5 +1,5 @@
-- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ sum(l_extendedprice) / 7.0 AS
avg_yearly
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
sum(l_extendedprice) / 7.0 AS avg_yearly
FROM
lineitem,
part
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
index 971d457672..2eb2505c01 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_name,
c_custkey,
o_orderkey,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
index 8cc1b890c9..16e543f87c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
@@ -1,5 +1,5 @@
-- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */ sum(l_extendedprice * (1 -
l_discount)) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
sum(l_extendedprice * (1 - l_discount)) AS revenue
FROM
lineitem,
part
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
index f54d44ed8b..a2aca56790 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
@@ -1,5 +1,5 @@
-- tables: supplier,nation,partsupp,lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_name,
s_address
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
index 56d81211db..7b4874f96c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
@@ -1,5 +1,5 @@
-- tables: supplier,lineitem,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_name,
count(*) AS numwait
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
index c3f199f124..bf784175e0 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
@@ -1,5 +1,5 @@
-- tables: orders,customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
cntrycode,
count(*) AS numcust,
sum(c_acctbal) AS totacctbal
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]