This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 49a32c2ee0 [pipelineX](fix) fix two phase execution and add test cases
(#23353)
49a32c2ee0 is described below
commit 49a32c2ee0015b11eb8aa7bcf4fa73ab33035eba
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 25 17:57:35 2023 +0800
[pipelineX](fix) fix two phase execution and add test cases (#23353)
---
.../pipeline/exec/aggregation_source_operator.cpp | 3 +--
be/src/pipeline/exec/exchange_source_operator.cpp | 6 +----
be/src/pipeline/exec/exchange_source_operator.h | 10 +++++++++
be/src/pipeline/exec/operator.h | 7 ++++++
be/src/pipeline/exec/result_sink_operator.cpp | 16 +++++--------
be/src/pipeline/exec/result_sink_operator.h | 5 +++--
be/src/pipeline/pipeline_task.h | 2 ++
.../pipeline_x/pipeline_x_fragment_context.cpp | 26 +++++++++++++++++++---
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 12 ++++++----
be/src/pipeline/pipeline_x/pipeline_x_task.h | 8 ++++++-
be/src/pipeline/task_scheduler.cpp | 3 ++-
.../suites/ssb_sf0.1_p1/sql/flat_q1.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q1.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q1.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q2.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q2.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q2.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q3.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q3.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q3.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q3.4.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q4.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q4.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/flat_q4.3.sql | 2 +-
24 files changed, 82 insertions(+), 42 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 2f91a633f2..cbee5a832f 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -557,8 +557,7 @@ Status AggSourceOperatorX::setup_local_state(RuntimeState*
state, LocalStateInfo
}
bool AggSourceOperatorX::can_read(RuntimeState* state) {
- auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
- return local_state._dependency->done();
+ return
state->get_local_state(id())->cast<AggLocalState>()._dependency->done();
}
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index c593e5ab96..dadf70f0de 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -48,11 +48,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
return Status::OK();
}
RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
- auto& parent_ref = _parent->cast<ExchangeSourceOperatorX>();
- stream_recvr = _state->exec_env()->vstream_mgr()->create_recvr(
- _state, parent_ref._input_row_desc,
_state->fragment_instance_id(), parent_ref._id,
- parent_ref._num_senders, profile(), parent_ref._is_merging,
- parent_ref._sub_plan_query_statistics_recvr);
+ stream_recvr = info.recvr;
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
state, vsort_exec_exprs));
_init = true;
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index d717970a97..d599d3d06d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -80,6 +80,16 @@ public:
Status close(RuntimeState* state) override;
bool is_source() const override { return true; }
+ bool need_to_create_exch_recv() const override { return true; }
+
+ RowDescriptor input_row_desc() const { return _input_row_desc; }
+
+ int num_senders() const { return _num_senders; }
+ bool is_merging() const { return _is_merging; }
+
+ std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr() {
+ return _sub_plan_query_statistics_recvr;
+ }
private:
friend class ExchangeLocalState;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index dbc323cc67..2fb484afc0 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -36,6 +36,8 @@
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
+#include "vec/runtime/vdata_stream_recvr.h"
+#include "vec/sink/vresult_sink.h"
namespace doris {
class DataSink;
@@ -484,12 +486,14 @@ protected:
struct LocalStateInfo {
const std::vector<TScanRangeParams> scan_ranges;
Dependency* dependency;
+ std::shared_ptr<vectorized::VDataStreamRecvr> recvr;
};
// This struct is used only for initializing local sink state.
struct LocalSinkStateInfo {
const int sender_id;
Dependency* dependency;
+ std::shared_ptr<BufferControlBlock> sender;
};
class PipelineXLocalState {
@@ -674,6 +678,7 @@ public:
}
virtual bool is_source() const override { return false; }
+ [[nodiscard]] virtual bool need_to_create_exch_recv() const { return
false; }
Status get_next_after_projects(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state);
@@ -768,6 +773,8 @@ public:
virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo&
info) = 0;
+ [[nodiscard]] virtual bool need_to_create_result_sender() const { return
false; }
+
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this));
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index df917c26af..ce8a933d7e 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -58,9 +58,7 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title));
// create sender
-
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(),
-
p._buf_size, &_sender, true,
-
state->execution_timeout()));
+ _sender = info.sender;
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
@@ -81,11 +79,8 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
ResultSinkOperatorX::ResultSinkOperatorX(const RowDescriptor& row_desc,
const std::vector<TExpr>&
t_output_expr,
- const TResultSink& sink, int
buffer_size)
- : DataSinkOperatorX(0),
- _row_desc(row_desc),
- _t_output_expr(t_output_expr),
- _buf_size(buffer_size) {
+ const TResultSink& sink)
+ : DataSinkOperatorX(0), _row_desc(row_desc),
_t_output_expr(t_output_expr) {
if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) {
_sink_type = TResultSinkType::MYSQL_PROTOCAL;
} else {
@@ -185,7 +180,6 @@ Status ResultSinkLocalState::close(RuntimeState* state) {
}
bool ResultSinkOperatorX::can_write(RuntimeState* state) {
- auto& local_state =
state->get_sink_local_state(id())->cast<ResultSinkLocalState>();
- return local_state._sender->can_sink();
+ return
state->get_sink_local_state(id())->cast<ResultSinkLocalState>()._sender->can_sink();
}
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index e2e2e517f8..e98bae86e7 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -65,7 +65,7 @@ private:
class ResultSinkOperatorX final : public DataSinkOperatorX {
public:
ResultSinkOperatorX(const RowDescriptor& row_desc, const
std::vector<TExpr>& select_exprs,
- const TResultSink& sink, int buffer_size);
+ const TResultSink& sink);
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
@@ -75,6 +75,8 @@ public:
bool can_write(RuntimeState* state) override;
+ [[nodiscard]] bool need_to_create_result_sender() const override { return
true; }
+
private:
friend class ResultSinkLocalState;
@@ -89,7 +91,6 @@ private:
// Owned by the RuntimeState.
const std::vector<TExpr>& _t_output_expr;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
- int _buf_size; // Allocated from _pool
// for fetch data by rowids
TFetchOption _fetch_option;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 57d7659197..fc66ca54f1 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -246,6 +246,8 @@ public:
}
}
+ TUniqueId instance_id() const { return _state->fragment_instance_id(); }
+
protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7e97696a62..1a569202fb 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -26,6 +26,7 @@
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/tracer.h>
#include <pthread.h>
+#include <runtime/result_buffer_mgr.h>
#include <stdlib.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
@@ -252,8 +253,7 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}
// TODO: figure out good buffer size based on size of output row
- _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs,
thrift_sink.result_sink,
-
vectorized::RESULT_SINK_BUFFER_SIZE));
+ _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs,
thrift_sink.result_sink));
break;
}
default:
@@ -302,10 +302,30 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto scan_ranges =
find_with_default(local_params.per_node_scan_ranges,
_pipelines[pip_idx]->operator_xs().front()->id(),
no_scan_ranges);
+ std::shared_ptr<BufferControlBlock> sender = nullptr;
+ if (_pipelines[pip_idx]->sink_x()->need_to_create_result_sender())
{
+ // create sender
+
RETURN_IF_ERROR(_runtime_states[i]->exec_env()->result_mgr()->create_sender(
+ _runtime_states[i]->fragment_instance_id(),
+ vectorized::RESULT_SINK_BUFFER_SIZE, &sender, true,
+ _runtime_states[i]->execution_timeout()));
+ }
+
+ std::shared_ptr<vectorized::VDataStreamRecvr> recvr = nullptr;
+ if
(_pipelines[pip_idx]->operator_xs().front()->need_to_create_exch_recv()) {
+ auto* src =
+
(ExchangeSourceOperatorX*)_pipelines[pip_idx]->operator_xs().front().get();
+ recvr =
_runtime_states[i]->exec_env()->vstream_mgr()->create_recvr(
+ _runtime_states[i].get(), src->input_row_desc(),
+ _runtime_states[i]->fragment_instance_id(), src->id(),
src->num_senders(),
+ _runtime_profile.get(), src->is_merging(),
+ src->sub_plan_query_statistics_recvr());
+ }
auto task = std::make_unique<PipelineXTask>(
_pipelines[pip_idx], _total_tasks++,
_runtime_states[i].get(), this,
- _pipelines[pip_idx]->pipeline_profile(), scan_ranges,
local_params.sender_id);
+ _pipelines[pip_idx]->pipeline_profile(), scan_ranges,
local_params.sender_id,
+ sender, recvr);
pipeline_id_to_task.insert({_pipelines[pip_idx]->id(),
task.get()});
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get()));
_runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true,
nullptr);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 676414fce5..f55c11982e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -45,14 +45,18 @@ namespace doris::pipeline {
PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index,
RuntimeState* state,
PipelineFragmentContext* fragment_context,
RuntimeProfile* parent_profile,
- const std::vector<TScanRangeParams>& scan_ranges,
const int sender_id)
+ const std::vector<TScanRangeParams>& scan_ranges,
const int sender_id,
+ std::shared_ptr<BufferControlBlock>& sender,
+ std::shared_ptr<vectorized::VDataStreamRecvr>&
recvr)
: PipelineTask(pipeline, index, state, fragment_context,
parent_profile),
_scan_ranges(scan_ranges),
_operators(pipeline->operator_xs()),
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()),
- _sender_id(sender_id) {
+ _sender_id(sender_id),
+ _sender(sender),
+ _recvr(recvr) {
_pipeline_task_watcher.start();
_sink->get_dependency(_downstream_dependency);
}
@@ -99,13 +103,13 @@ Status PipelineXTask::_open() {
Dependency* dep = _upstream_dependency.find(o->id()) ==
_upstream_dependency.end()
? (Dependency*)nullptr
:
_upstream_dependency.find(o->id())->second.get();
- LocalStateInfo info {_scan_ranges, dep};
+ LocalStateInfo info {_scan_ranges, dep, _recvr};
Status cur_st = o->setup_local_state(_state, info);
if (!cur_st.ok()) {
st = cur_st;
}
}
- LocalSinkStateInfo info {_sender_id, _downstream_dependency.get()};
+ LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(),
_sender};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
RETURN_IF_ERROR(st);
_opened = true;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 1453b10ba2..864709b4ed 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -31,6 +31,7 @@
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
+#include "vec/sink/vresult_sink.h"
namespace doris {
class QueryContext;
@@ -50,7 +51,9 @@ class PipelineXTask : public PipelineTask {
public:
PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile,
- const std::vector<TScanRangeParams>& scan_ranges, const int
sender_id);
+ const std::vector<TScanRangeParams>& scan_ranges, const int
sender_id,
+ std::shared_ptr<BufferControlBlock>& sender,
+ std::shared_ptr<vectorized::VDataStreamRecvr>& recvr);
Status prepare(RuntimeState* state) override;
@@ -127,5 +130,8 @@ private:
DependencyMap _upstream_dependency;
DependencySPtr _downstream_dependency;
+
+ std::shared_ptr<BufferControlBlock> _sender;
+ std::shared_ptr<vectorized::VDataStreamRecvr> _recvr;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index b792f0f4c6..0be333479b 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -123,7 +123,8 @@ void BlockedTaskScheduler::_schedule() {
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" <<
print_id(task->query_context()->query_id())
<< ", instance_id="
- <<
print_id(task->fragment_context()->get_fragment_instance_id());
+ <<
print_id(task->fragment_context()->get_fragment_instance_id())
+ << ", task info: " << task->debug_string();
task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT);
_make_task_run(local_blocked_tasks, iter, ready_tasks);
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
index eae02823a0..c3d19b67a2 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
LO_ORDERDATE >= 19930101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
index 3a899c9344..6ab6ceea34 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q1.2
-SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
LO_ORDERDATE >= 19940101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
index 5aaeff83a7..70796c2a95 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q1.3
-SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
weekofyear(LO_ORDERDATE) = 6
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
index 254ea6481a..57f2ada296 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.1
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
index 6a636f3a9e..9b7a5db502 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.2
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
index a2ef0c6df3..3a8a5e74d4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.3
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
index 8df98222c4..6b3257f1f3 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.1
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_NATION,
S_NATION, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
index c588b5bbce..fefe727da8 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.2
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
index 9a099d1732..c4560b701e 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.3
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
index 6bd71b5891..4ae5d956e4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.4
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
index aedd0e047e..87b29bf160 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.1
-SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE
DIV 10000) AS YEAR,
C_NATION,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
index b9891ee408..8ea28f3f12 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.2
-SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE
DIV 10000) AS YEAR,
S_NATION,
P_CATEGORY,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
index 6871023137..0f7c7401ab 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.3
-SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE
DIV 10000) AS YEAR,
S_CITY,
P_BRAND,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]