This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 873b128fde [feature](pipeline) add inersect/except operators (#14868)
873b128fde is described below
commit 873b128fde04e45257dc73dd192475c5c6e37109
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Dec 9 14:13:48 2022 +0800
[feature](pipeline) add inersect/except operators (#14868)
---
be/src/exec/exec_node.cpp | 3 +-
be/src/exec/exec_node.h | 2 +
be/src/pipeline/CMakeLists.txt | 6 +-
be/src/pipeline/exec/operator.h | 4 +-
be/src/pipeline/exec/set_probe_sink_operator.cpp | 63 ++++
be/src/pipeline/exec/set_probe_sink_operator.h | 67 ++++
be/src/pipeline/exec/set_sink_operator.cpp | 43 +++
be/src/pipeline/exec/set_sink_operator.h | 62 ++++
be/src/pipeline/exec/set_source_operator.cpp | 44 +++
be/src/pipeline/exec/set_source_operator.h | 57 +++
be/src/pipeline/pipeline_fragment_context.cpp | 35 ++
be/src/pipeline/pipeline_fragment_context.h | 8 +
be/src/pipeline/pipeline_task.cpp | 1 -
be/src/vec/CMakeLists.txt | 2 -
be/src/vec/exec/vexcept_node.cpp | 114 ------
be/src/vec/exec/vexcept_node.h | 40 --
be/src/vec/exec/vintersect_node.cpp | 115 ------
be/src/vec/exec/vintersect_node.h | 50 ---
be/src/vec/exec/vset_operation_node.cpp | 445 +++++++++++++++++++----
be/src/vec/exec/vset_operation_node.h | 243 ++-----------
20 files changed, 793 insertions(+), 611 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index cfbc9a50ca..5849b455cb 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -72,13 +72,12 @@
#include "vec/exec/vbroker_scan_node.h"
#include "vec/exec/vdata_gen_scan_node.h"
#include "vec/exec/vempty_set_node.h"
-#include "vec/exec/vexcept_node.h"
#include "vec/exec/vexchange_node.h"
-#include "vec/exec/vintersect_node.h"
#include "vec/exec/vmysql_scan_node.h"
#include "vec/exec/vrepeat_node.h"
#include "vec/exec/vschema_scan_node.h"
#include "vec/exec/vselect_node.h"
+#include "vec/exec/vset_operation_node.h"
#include "vec/exec/vsort_node.h"
#include "vec/exec/vtable_function_node.h"
#include "vec/exec/vunion_node.h"
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index eaf367f6b9..0da7338c80 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -242,6 +242,8 @@ public:
ExecNode* child(int i) { return _children[i]; }
+ size_t children_count() const { return _children.size(); }
+
protected:
friend class DataSink;
diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index 38b01ddd20..659b357870 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -45,8 +45,10 @@ set(PIPELINE_FILES
exec/sort_sink_operator.cpp
exec/repeat_operator.cpp
exec/table_function_operator.cpp
- )
+ exec/set_sink_operator.cpp
+ exec/set_source_operator.cpp
+ exec/set_probe_sink_operator.cpp)
add_library(Pipeline STATIC
${PIPELINE_FILES}
- )
\ No newline at end of file
+ )
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index f1e29f0547..a17e8dd1f4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -271,9 +271,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
- if (!UNLIKELY(in_block)) {
- DCHECK(source_state == SourceState::FINISHED)
- << "block is null, eos should invoke in finalize.";
+ if (UNLIKELY(!in_block || in_block->rows() == 0)) {
return Status::OK();
}
return _sink->send(state, in_block, source_state ==
SourceState::FINISHED);
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
new file mode 100644
index 0000000000..2f51edd12a
--- /dev/null
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "set_probe_sink_operator.h"
+
+#include "vec/exec/vset_operation_node.h"
+
+namespace doris::pipeline {
+
+template <bool is_intersect>
+SetProbeSinkOperatorBuilder<is_intersect>::SetProbeSinkOperatorBuilder(int32_t
id, int child_id,
+
ExecNode* set_node)
+ : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id,
builder_name, set_node),
+ _child_id(child_id) {}
+
+template <bool is_intersect>
+OperatorPtr SetProbeSinkOperatorBuilder<is_intersect>::build_operator() {
+ return std::make_shared<SetProbeSinkOperator<is_intersect>>(this,
_child_id, this->_node);
+}
+
+template <bool is_intersect>
+SetProbeSinkOperator<is_intersect>::SetProbeSinkOperator(OperatorBuilderBase*
operator_builder,
+ int child_id,
ExecNode* set_node)
+ :
Operator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, set_node),
+ _child_id(child_id) {}
+
+template <bool is_intersect>
+Status SetProbeSinkOperator<is_intersect>::sink(RuntimeState* state,
vectorized::Block* block,
+ SourceState source_state) {
+ return this->_node->sink_probe(state, _child_id, block, source_state ==
SourceState::FINISHED);
+}
+
+template <bool is_intersect>
+Status SetProbeSinkOperator<is_intersect>::finalize(RuntimeState* state) {
+ return this->_node->finalize_probe(state, _child_id);
+}
+
+template <bool is_intersect>
+bool SetProbeSinkOperator<is_intersect>::can_write() {
+ DCHECK_GT(_child_id, 0);
+ return this->_node->is_child_finished(_child_id - 1);
+}
+
+template class SetProbeSinkOperatorBuilder<true>;
+template class SetProbeSinkOperatorBuilder<false>;
+template class SetProbeSinkOperator<true>;
+template class SetProbeSinkOperator<false>;
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
new file mode 100644
index 0000000000..c4dc263803
--- /dev/null
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris {
+
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
+namespace pipeline {
+
+template <bool is_intersect>
+class SetProbeSinkOperatorBuilder final
+ : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> {
+private:
+ constexpr static auto builder_name =
+ is_intersect ? "IntersectProbeSinkOperatorBuilder" :
"ExceptProbeSinkOperatorBuilder";
+
+public:
+ SetProbeSinkOperatorBuilder(int32_t id, int child_id, ExecNode* set_node);
+ bool is_sink() const override { return true; }
+
+ OperatorPtr build_operator() override;
+
+private:
+ int _child_id;
+};
+
+template <bool is_intersect>
+class SetProbeSinkOperator : public
Operator<SetProbeSinkOperatorBuilder<is_intersect>> {
+public:
+ SetProbeSinkOperator(OperatorBuilderBase* operator_builder, int child_id,
ExecNode* set_node);
+
+ bool can_write() override;
+
+ Status sink(RuntimeState* state, vectorized::Block* block, SourceState
source_state) override;
+ Status finalize(RuntimeState* state) override;
+ Status open(RuntimeState* /*state*/) override { return Status::OK(); }
+ Status close(RuntimeState* /*state*/) override { return Status::OK(); }
+
+private:
+ int _child_id;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
new file mode 100644
index 0000000000..aaa85c31f1
--- /dev/null
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "set_sink_operator.h"
+
+#include "vec/exec/vset_operation_node.h"
+
+namespace doris::pipeline {
+
+template <bool is_intersect>
+SetSinkOperatorBuilder<is_intersect>::SetSinkOperatorBuilder(int32_t id,
ExecNode* set_node)
+ : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id,
builder_name, set_node) {
+}
+
+template <bool is_intersect>
+OperatorPtr SetSinkOperatorBuilder<is_intersect>::build_operator() {
+ return std::make_shared<SetSinkOperator<is_intersect>>(this, this->_node);
+}
+
+template <bool is_intersect>
+SetSinkOperator<is_intersect>::SetSinkOperator(
+ OperatorBuilderBase* builder,
vectorized::VSetOperationNode<is_intersect>* set_node)
+ : Operator<SetSinkOperatorBuilder<is_intersect>>(builder, set_node) {}
+
+template class SetSinkOperatorBuilder<true>;
+template class SetSinkOperatorBuilder<false>;
+template class SetSinkOperator<true>;
+template class SetSinkOperator<false>;
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
new file mode 100644
index 0000000000..eb6e5087fb
--- /dev/null
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris {
+
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
+namespace pipeline {
+
+template <bool is_intersect>
+class SetSinkOperatorBuilder final
+ : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> {
+private:
+ constexpr static auto builder_name =
+ is_intersect ? "IntersectSinkOperatorBuilder" :
"ExceptSinkOperatorBuilder";
+
+public:
+ SetSinkOperatorBuilder(int32_t id, ExecNode* set_node);
+ bool is_sink() const override { return true; }
+
+ OperatorPtr build_operator() override;
+};
+
+template <bool is_intersect>
+class SetSinkOperator : public Operator<SetSinkOperatorBuilder<is_intersect>> {
+public:
+ SetSinkOperator(OperatorBuilderBase* operator_builder,
+ vectorized::VSetOperationNode<is_intersect>* set_node);
+
+ bool can_write() override { return true; }
+
+ Status close(RuntimeState* /*state*/) override { return Status::OK(); };
+
+private:
+ vectorized::VSetOperationNode<is_intersect>* _set_node;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
new file mode 100644
index 0000000000..bb14936e83
--- /dev/null
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "set_source_operator.h"
+
+#include "vec/exec/vset_operation_node.h"
+
+namespace doris::pipeline {
+
+template <bool is_intersect>
+SetSourceOperatorBuilder<is_intersect>::SetSourceOperatorBuilder(int32_t id,
ExecNode* set_node)
+ : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id,
builder_name, set_node) {
+}
+
+template <bool is_intersect>
+OperatorPtr SetSourceOperatorBuilder<is_intersect>::build_operator() {
+ return std::make_shared<SetSourceOperator<is_intersect>>(this,
this->_node);
+}
+
+template <bool is_intersect>
+SetSourceOperator<is_intersect>::SetSourceOperator(
+ OperatorBuilderBase* builder,
vectorized::VSetOperationNode<is_intersect>* set_node)
+ : Operator<SetSourceOperatorBuilder<is_intersect>>(builder, set_node)
{}
+
+template class SetSourceOperatorBuilder<true>;
+template class SetSourceOperatorBuilder<false>;
+template class SetSourceOperator<true>;
+template class SetSourceOperator<false>;
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
new file mode 100644
index 0000000000..c870d15fa8
--- /dev/null
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris {
+
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
+namespace pipeline {
+
+template <bool is_intersect>
+class SetSourceOperatorBuilder
+ : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> {
+private:
+ constexpr static auto builder_name =
+ is_intersect ? "IntersectSourceOperatorBuilder" :
"ExceptSourceOperatorBuilder";
+
+public:
+ SetSourceOperatorBuilder(int32_t id, ExecNode* set_node);
+ bool is_source() const override { return true; }
+
+ OperatorPtr build_operator() override;
+};
+
+template <bool is_intersect>
+class SetSourceOperator : public
Operator<SetSourceOperatorBuilder<is_intersect>> {
+public:
+ SetSourceOperator(OperatorBuilderBase* builder,
+ vectorized::VSetOperationNode<is_intersect>* set_node);
+
+ Status open(RuntimeState* /*state*/) override { return Status::OK(); };
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8c5cc7b7bb..f2deb1375a 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -33,6 +33,9 @@
#include "exec/result_sink_operator.h"
#include "exec/scan_node.h"
#include "exec/scan_operator.h"
+#include "exec/set_probe_sink_operator.h"
+#include "exec/set_sink_operator.h"
+#include "exec/set_source_operator.h"
#include "exec/sort_sink_operator.h"
#include "exec/sort_source_operator.h"
#include "exec/streaming_aggregation_sink_operator.h"
@@ -56,6 +59,7 @@
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vexchange_node.h"
#include "vec/exec/vrepeat_node.h"
+#include "vec/exec/vset_operation_node.h"
#include "vec/exec/vsort_node.h"
#include "vec/sink/vresult_sink.h"
@@ -382,6 +386,14 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
cur_pipe->add_dependency(new_pipe);
break;
}
+ case TPlanNodeType::INTERSECT_NODE: {
+ RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(node,
cur_pipe));
+ break;
+ }
+ case TPlanNodeType::EXCEPT_NODE: {
+ RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(node,
cur_pipe));
+ break;
+ }
default:
return Status::InternalError("Unsupported exec type in pipeline: {}",
print_plan_node_type(node_type));
@@ -389,6 +401,29 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
return Status::OK();
}
+template <bool is_intersect>
+Status
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode* node,
+
PipelinePtr cur_pipe) {
+ auto build_pipeline = add_pipeline();
+ RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
+ OperatorBuilderPtr sink_builder =
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(
+ next_operator_builder_id(), node);
+ RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
+
+ for (int child_id = 1; child_id < node->children_count(); ++child_id) {
+ auto probe_pipeline = add_pipeline();
+ RETURN_IF_ERROR(_build_pipelines(node->child(child_id),
probe_pipeline));
+ OperatorBuilderPtr probe_sink_builder =
+ std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(
+ next_operator_builder_id(), child_id, node);
+ RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
+ }
+
+ OperatorBuilderPtr source_builder =
std::make_shared<SetSourceOperatorBuilder<is_intersect>>(
+ next_operator_builder_id(), node);
+ return cur_pipe->add_operator(source_builder);
+}
+
Status PipelineFragmentContext::submit() {
if (_submitted) {
return Status::InternalError("submitted");
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index dc88feebf3..fd1d50ebff 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -24,6 +24,11 @@ namespace doris {
class ExecNode;
class DataSink;
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
namespace pipeline {
class PipelineTask;
@@ -121,6 +126,9 @@ private:
Status _create_sink(const TDataSink& t_data_sink);
Status _build_pipelines(ExecNode*, PipelinePtr);
Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams&
request);
+
+ template <bool is_intersect>
+ Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index fbe1d6b819..6e867d5959 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -146,7 +146,6 @@ Status PipelineTask::execute(bool* eos) {
break;
}
}
- *eos = false;
}
return Status::OK();
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 8dfd11ade4..ca5c9c17f5 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -94,8 +94,6 @@ set(VEC_FILES
exec/vexchange_node.cpp
exec/vset_operation_node.cpp
exec/vunion_node.cpp
- exec/vintersect_node.cpp
- exec/vexcept_node.cpp
exec/vselect_node.cpp
exec/vmysql_scan_node.cpp
exec/vschema_scan_node.cpp
diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp
deleted file mode 100644
index 8cf391f72f..0000000000
--- a/be/src/vec/exec/vexcept_node.cpp
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/vexcept_node.h"
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-#include "vec/core/block.h"
-#include "vec/exec/vset_operation_node.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-namespace doris {
-namespace vectorized {
-
-VExceptNode::VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
- : VSetOperationNode(pool, tnode, descs) {}
-
-Status VExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(VSetOperationNode::init(tnode, state));
- DCHECK(tnode.__isset.except_node);
- return Status::OK();
-}
-
-Status VExceptNode::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(VSetOperationNode::prepare(state));
- return Status::OK();
-}
-
-Status VExceptNode::open(RuntimeState* state) {
- START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::open");
- RETURN_IF_ERROR(VSetOperationNode::open(state));
- bool eos = false;
- Status st = Status::OK();
- for (int i = 1; i < _children.size(); ++i) {
- if (i > 1) {
- refresh_hash_table<false>();
- }
-
- RETURN_IF_ERROR(child(i)->open(state));
- eos = false;
- int probe_expr_ctxs_sz = _child_expr_lists[i].size();
- _probe_columns.resize(probe_expr_ctxs_sz);
-
- while (!eos) {
- RETURN_IF_ERROR(process_probe_block(state, i, &eos));
- if (_probe_rows == 0) continue;
-
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- HashTableProbe<HashTableCtxType, false>
process_hashtable_ctx(
- this, state->batch_size(), _probe_rows);
- st =
process_hashtable_ctx.mark_data_in_hashtable(arg);
-
- } else {
- LOG(FATAL) << "FATAL: uninited hash table";
- }
- },
- *_hash_table_variants);
- }
- }
- return st;
-}
-
-Status VExceptNode::get_next(RuntimeState* state, Block* output_block, bool*
eos) {
- INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VExceptNode::get_next");
- SCOPED_TIMER(_probe_timer);
- Status st;
- create_mutable_cols(output_block);
-
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- HashTableProbe<HashTableCtxType, false>
process_hashtable_ctx(
- this, state->batch_size(), _probe_rows);
- st = process_hashtable_ctx.get_data_in_hashtable(arg,
_mutable_cols,
-
output_block, eos);
- } else {
- LOG(FATAL) << "FATAL: uninited hash table";
- }
- },
- *_hash_table_variants);
-
- RETURN_IF_ERROR(
- VExprContext::filter_block(_vconjunct_ctx_ptr, output_block,
output_block->columns()));
- reached_limit(output_block, eos);
-
- return st;
-}
-
-Status VExceptNode::close(RuntimeState* state) {
- START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::close");
- return VSetOperationNode::close(state);
-}
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/vexcept_node.h b/be/src/vec/exec/vexcept_node.h
deleted file mode 100644
index 7bbed571e7..0000000000
--- a/be/src/vec/exec/vexcept_node.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "vec/exec/vset_operation_node.h"
-
-namespace doris {
-namespace vectorized {
-
-class VExceptNode : public VSetOperationNode {
-public:
- VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl&
descs);
- virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
- virtual Status prepare(RuntimeState* state);
- virtual Status open(RuntimeState* state);
- using VSetOperationNode::get_next;
- virtual Status get_next(RuntimeState* state, vectorized::Block*
output_block, bool* eos);
- virtual Status close(RuntimeState* state);
-
-private:
- template <class HashTableContext, bool is_intersected>
- friend struct HashTableProbe;
-};
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/vintersect_node.cpp
b/be/src/vec/exec/vintersect_node.cpp
deleted file mode 100644
index b232708533..0000000000
--- a/be/src/vec/exec/vintersect_node.cpp
+++ /dev/null
@@ -1,115 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/vintersect_node.h"
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-#include "vec/core/block.h"
-#include "vec/exec/vset_operation_node.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-namespace doris {
-namespace vectorized {
-
-VIntersectNode::VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
- : VSetOperationNode(pool, tnode, descs) {}
-
-Status VIntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(VSetOperationNode::init(tnode, state));
- DCHECK(tnode.__isset.intersect_node);
- return Status::OK();
-}
-
-Status VIntersectNode::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(VSetOperationNode::prepare(state));
- return Status::OK();
-}
-
-Status VIntersectNode::open(RuntimeState* state) {
- START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::open");
- RETURN_IF_ERROR(VSetOperationNode::open(state));
- bool eos = false;
- Status st = Status::OK();
-
- for (int i = 1; i < _children.size(); ++i) {
- if (i > 1) {
- refresh_hash_table<true>();
- }
-
- _valid_element_in_hash_tbl = 0;
- RETURN_IF_ERROR(child(i)->open(state));
- eos = false;
- _probe_columns.resize(_child_expr_lists[i].size());
-
- while (!eos) {
- RETURN_IF_ERROR(process_probe_block(state, i, &eos));
- if (_probe_rows == 0) continue;
-
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- HashTableProbe<HashTableCtxType, true>
process_hashtable_ctx(
- this, state->batch_size(), _probe_rows);
- st =
process_hashtable_ctx.mark_data_in_hashtable(arg);
-
- } else {
- LOG(FATAL) << "FATAL: uninited hash table";
- }
- },
- *_hash_table_variants);
- }
- }
- return st;
-}
-
-Status VIntersectNode::get_next(RuntimeState* state, Block* output_block,
bool* eos) {
- INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VIntersectNode::get_next");
- SCOPED_TIMER(_probe_timer);
- create_mutable_cols(output_block);
- Status st;
-
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- HashTableProbe<HashTableCtxType, true>
process_hashtable_ctx(
- this, state->batch_size(), _probe_rows);
- st = process_hashtable_ctx.get_data_in_hashtable(arg,
_mutable_cols,
-
output_block, eos);
-
- } else {
- LOG(FATAL) << "FATAL: uninited hash table";
- }
- },
- *_hash_table_variants);
-
- RETURN_IF_ERROR(
- VExprContext::filter_block(_vconjunct_ctx_ptr, output_block,
output_block->columns()));
- reached_limit(output_block, eos);
-
- return st;
-}
-
-Status VIntersectNode::close(RuntimeState* state) {
- START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::close");
- return VSetOperationNode::close(state);
-}
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/vintersect_node.h
b/be/src/vec/exec/vintersect_node.h
deleted file mode 100644
index 1316fcd761..0000000000
--- a/be/src/vec/exec/vintersect_node.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/exec_node.h"
-#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_table.h"
-#include "vec/core/materialize_block.h"
-#include "vec/exec/join/join_op.h"
-#include "vec/exec/join/vacquire_list.hpp"
-#include "vec/exec/join/vhash_join_node.h"
-#include "vec/exec/vset_operation_node.h"
-#include "vec/functions/function.h"
-#include "vec/utils/util.hpp"
-
-namespace doris {
-namespace vectorized {
-
-class VExprContext;
-class VIntersectNode : public VSetOperationNode {
-public:
- VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
- virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
- virtual Status prepare(RuntimeState* state);
- virtual Status open(RuntimeState* state);
- using VSetOperationNode::get_next;
- virtual Status get_next(RuntimeState* state, vectorized::Block*
output_block, bool* eos);
- virtual Status close(RuntimeState* state);
-
-private:
- template <class HashTableContext, bool is_intersected>
- friend struct HashTableProbe;
-};
-} // namespace vectorized
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index d3e9c436d6..800b9aae6c 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -23,13 +23,12 @@ namespace doris {
namespace vectorized {
//build hash table for operation node, intersect/except node
-template <class HashTableContext>
+template <class HashTableContext, bool is_intersect>
struct HashTableBuild {
- HashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs&
build_raw_ptrs,
- VSetOperationNode* operation_node, uint8_t offset)
+ HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs,
+ VSetOperationNode<is_intersect>* operation_node, uint8_t
offset)
: _rows(rows),
_offset(offset),
- _acquired_block(acquired_block),
_build_raw_ptrs(build_raw_ptrs),
_operation_node(operation_node) {}
@@ -60,7 +59,6 @@ struct HashTableBuild {
if (emplace_result.is_inserted()) { //only inserted once as the
same key, others skip
new (&emplace_result.get_mapped()) Mapped({k, _offset});
- _operation_node->_valid_element_in_hash_tbl++;
}
}
return Status::OK();
@@ -69,35 +67,95 @@ struct HashTableBuild {
private:
const int _rows;
const uint8_t _offset;
- Block& _acquired_block;
ColumnRawPtrs& _build_raw_ptrs;
- VSetOperationNode* _operation_node;
+ VSetOperationNode<is_intersect>* _operation_node;
};
-VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
- const DescriptorTbl& descs)
+template <class HashTableContext, bool is_intersected>
+struct HashTableProbe {
+ HashTableProbe(VSetOperationNode<is_intersected>* operation_node, int
probe_rows)
+ : _operation_node(operation_node),
+ _probe_rows(probe_rows),
+ _probe_raw_ptrs(operation_node->_probe_columns),
+ _arena(new Arena) {}
+
+ Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
+ using KeyGetter = typename HashTableContext::State;
+ using Mapped = typename HashTableContext::Mapped;
+
+ KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz,
nullptr);
+ if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
+ if (_probe_keys.size() < _probe_rows) {
+ _probe_keys.resize(_probe_rows);
+ }
+ size_t keys_size = _probe_raw_ptrs.size();
+ for (size_t i = 0; i < _probe_rows; ++i) {
+ _probe_keys[i] =
+ serialize_keys_to_pool_contiguous(i, keys_size,
_probe_raw_ptrs, *_arena);
+ }
+ key_getter.set_serialized_keys(_probe_keys.data());
+ }
+
+ if constexpr (std::is_same_v<typename HashTableContext::Mapped,
RowRefListWithFlags>) {
+ for (int probe_index = 0; probe_index < _probe_rows;
probe_index++) {
+ auto find_result =
+ key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena);
+ if (find_result.is_found()) { //if found, marked visited
+ auto it = find_result.get_mapped().begin();
+ if (!(it->visited)) {
+ it->visited = true;
+ if constexpr (is_intersected) { //intersected
+ _operation_node->_valid_element_in_hash_tbl++;
+ } else {
+ _operation_node->_valid_element_in_hash_tbl--;
//except
+ }
+ }
+ }
+ }
+ } else {
+ LOG(FATAL) << "Invalid RowRefListType!";
+ }
+ return Status::OK();
+ }
+
+private:
+ VSetOperationNode<is_intersected>* _operation_node;
+ const size_t _probe_rows;
+ ColumnRawPtrs& _probe_raw_ptrs;
+ std::unique_ptr<Arena> _arena;
+ std::vector<StringRef> _probe_keys;
+};
+
+template <bool is_intersect>
+VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_valid_element_in_hash_tbl(0),
_mem_used(0),
- _probe_index(-1),
- _probe_rows(0) {
+ _build_block_index(0),
+ _build_finished(false) {
_hash_table_variants = std::make_unique<HashTableVariants>();
_arena = std::make_unique<Arena>();
}
-Status VSetOperationNode::close(RuntimeState* state) {
- if (is_closed()) {
- return Status::OK();
- }
- START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VSetOperationNode::close");
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::release_resource(RuntimeState* state) {
for (auto& exprs : _child_expr_lists) {
VExpr::close(exprs, state);
}
release_mem();
+}
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::close(RuntimeState* state) {
+ if (is_closed()) {
+ return Status::OK();
+ }
+ START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VSetOperationNode<is_intersect>::close");
return ExecNode::close(state);
}
-Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::init(const TPlanNode& tnode,
RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
std::vector<std::vector<::doris::TExpr>> result_texpr_lists;
@@ -119,20 +177,71 @@ Status VSetOperationNode::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status VSetOperationNode::open(RuntimeState* state) {
- START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open");
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) {
// open result expr lists.
for (const std::vector<VExprContext*>& exprs : _child_expr_lists) {
RETURN_IF_ERROR(VExpr::open(exprs, state));
}
- RETURN_IF_ERROR(hash_table_build(state));
+ _probe_finished_children_index.assign(_child_expr_lists.size(), false);
+ _probe_columns.resize(_child_expr_lists[1].size());
return Status::OK();
}
-Status VSetOperationNode::prepare(RuntimeState* state) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::open(RuntimeState* state) {
+ START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VSetOperationNode<is_intersect>::open");
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_ERROR(ExecNode::open(state));
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+
+ // TODO: build the hash table in a thread to open other children
asynchronously.
+ RETURN_IF_ERROR(hash_table_build(state));
+ bool eos = false;
+ Status st = Status::OK();
+ for (int i = 1; i < _children.size(); ++i) {
+ RETURN_IF_ERROR(child(i)->open(state));
+ eos = false;
+ int probe_expr_ctxs_sz = _child_expr_lists[i].size();
+ _probe_columns.resize(probe_expr_ctxs_sz);
+
+ if constexpr (is_intersect) {
+ _valid_element_in_hash_tbl = 0;
+ } else {
+ std::visit(
+ [&](auto&& arg) {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ _valid_element_in_hash_tbl = arg.hash_table.size();
+ }
+ },
+ *_hash_table_variants);
+ }
+
+ while (!eos) {
+ release_block_memory(_probe_block, i);
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(i)->get_next_after_projects(state, &_probe_block,
&eos),
+ child(i)->get_next_span(), eos);
+
+ RETURN_IF_ERROR(sink_probe(state, i, &_probe_block, eos));
+ }
+ finalize_probe(state, i);
+ }
+ return st;
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::get_next(RuntimeState* state, Block*
output_block,
+ bool* eos) {
+ INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VExceptNode::get_next");
+ SCOPED_TIMER(_probe_timer);
+ return pull(state, output_block, eos);
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
@@ -153,7 +262,8 @@ Status VSetOperationNode::prepare(RuntimeState* state) {
return Status::OK();
}
-void VSetOperationNode::hash_table_init() {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::hash_table_init() {
if (_child_expr_lists[0].size() == 1 && (!_build_not_ignore_null[0])) {
// Single column optimization
switch (_child_expr_lists[0][0]->root()->result_type()) {
@@ -244,14 +354,56 @@ void VSetOperationNode::hash_table_init() {
}
}
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::sink(RuntimeState*, Block* block, bool
eos) {
+ constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
+
+ if (block->rows() != 0) {
+ _mem_used += block->allocated_bytes();
+ _mutable_block.merge(*block);
+ }
+
+ if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
+ _build_blocks.emplace_back(_mutable_block.to_block());
+ RETURN_IF_ERROR(process_build_block(_build_blocks[_build_block_index],
_build_block_index));
+ _mutable_block.clear();
+ ++_build_block_index;
+
+ if (eos) {
+ _build_finished = true;
+ }
+ }
+ return Status::OK();
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::pull(RuntimeState* state, Block*
output_block, bool* eos) {
+ create_mutable_cols(output_block);
+ auto st = std::visit(
+ [&](auto&& arg) -> Status {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ return get_data_in_hashtable<HashTableCtxType>(arg,
output_block,
+
state->batch_size(), eos);
+ } else {
+ LOG(FATAL) << "FATAL: uninited hash table";
+ }
+ },
+ *_hash_table_variants);
+ RETURN_IF_ERROR(st);
+ RETURN_IF_ERROR(
+ VExprContext::filter_block(_vconjunct_ctx_ptr, output_block,
output_block->columns()));
+ reached_limit(output_block, eos);
+ return Status::OK();
+}
+
//build a hash table from child(0)
-Status VSetOperationNode::hash_table_build(RuntimeState* state) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->open(state));
Block block;
MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
- uint8_t index = 0;
- int64_t last_mem_used = 0;
bool eos = false;
while (!eos) {
block.clear_column_data();
@@ -259,34 +411,17 @@ Status VSetOperationNode::hash_table_build(RuntimeState*
state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block,
&eos),
child(0)->get_next_span(), eos);
-
- size_t allocated_bytes = block.allocated_bytes();
- _mem_used += allocated_bytes;
-
- if (block.rows() != 0) {
- mutable_block.merge(block);
- }
-
- // make one block for each 4 gigabytes
- constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL *
1024UL;
- if (_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE) {
- _build_blocks.emplace_back(mutable_block.to_block());
- // TODO:: Rethink may we should do the proess after we recevie all
build blocks ?
- // which is better.
- RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
- mutable_block = MutableBlock();
- ++index;
- last_mem_used = _mem_used;
+ if (eos) {
+ child(0)->close(state);
}
+ sink(state, &block, eos);
}
- _build_blocks.emplace_back(mutable_block.to_block());
- child(0)->close(state);
- RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
return Status::OK();
}
-Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::process_build_block(Block& block,
uint8_t offset) {
size_t rows = block.rows();
if (rows == 0) {
return Status::OK();
@@ -300,8 +435,8 @@ Status VSetOperationNode::process_build_block(Block& block,
uint8_t offset) {
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- HashTableBuild<HashTableCtxType>
hash_table_build_process(rows, block, raw_ptrs,
-
this, offset);
+ HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(
+ rows, raw_ptrs, this, offset);
hash_table_build_process(arg);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
@@ -312,28 +447,71 @@ Status VSetOperationNode::process_build_block(Block&
block, uint8_t offset) {
return Status::OK();
}
-Status VSetOperationNode::process_probe_block(RuntimeState* state, int
child_id, bool* eos) {
- if (!_probe_column_inserted_id.empty()) {
- for (int j = 0; j < _probe_column_inserted_id.size(); ++j) {
- auto column_to_erase = _probe_column_inserted_id[j];
- _probe_block.erase(column_to_erase - j);
- }
- _probe_column_inserted_id.clear();
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags&
value,
+ int& block_size) {
+ auto it = value.begin();
+ for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end();
++idx) {
+ auto& column =
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
+ _mutable_cols[idx->second]->insert_from(column, it->row_num);
+ }
+ block_size++;
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* /*state*/,
int child_id,
+ Block* block, bool eos) {
+ CHECK(_build_finished) << "cannot sink probe data before build finished";
+ if (child_id > 1) {
+ CHECK(_probe_finished_children_index[child_id - 1])
+ << fmt::format("child with id: {} should be probed first",
child_id);
+ }
+ auto probe_rows = block->rows();
+
+ if (probe_rows == 0) {
+ return Status::OK();
}
- release_block_memory(_probe_block, child_id);
- _probe_index = 0;
- _probe_rows = 0;
-
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(child_id)->get_next_after_projects(state, &_probe_block,
eos),
- child(child_id)->get_next_span(), *eos);
- _probe_rows = _probe_block.rows();
- RETURN_IF_ERROR(extract_probe_column(_probe_block, _probe_columns,
child_id));
+
+ RETURN_IF_ERROR(extract_probe_column(*block, _probe_columns, child_id));
+
+ return std::visit(
+ [&](auto&& arg) -> Status {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ HashTableProbe<HashTableCtxType, is_intersect>
process_hashtable_ctx(
+ this, probe_rows);
+ return process_hashtable_ctx.mark_data_in_hashtable(arg);
+ } else {
+ LOG(FATAL) << "FATAL: uninited hash table";
+ }
+ },
+ *_hash_table_variants);
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::finalize_probe(RuntimeState*
/*state*/, int child_id) {
+ if (child_id != (_children.size() - 1)) {
+ refresh_hash_table();
+ _probe_columns.resize(_child_expr_lists[child_id + 1].size());
+ } else {
+ _can_read = true;
+ }
+ _probe_finished_children_index[child_id] = true;
return Status::OK();
}
-Status VSetOperationNode::extract_build_column(Block& block, ColumnRawPtrs&
raw_ptrs) {
+template <bool is_intersect>
+bool VSetOperationNode<is_intersect>::is_child_finished(int child_id) const {
+ if (child_id == 0) {
+ return _build_finished;
+ }
+
+ return _probe_finished_children_index[child_id];
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::extract_build_column(Block& block,
+ ColumnRawPtrs&
raw_ptrs) {
for (size_t i = 0; i < _child_expr_lists[0].size(); ++i) {
int result_col_id = -1;
RETURN_IF_ERROR(_child_expr_lists[0][i]->execute(&block,
&result_col_id));
@@ -358,9 +536,10 @@ Status VSetOperationNode::extract_build_column(Block&
block, ColumnRawPtrs& raw_
return Status::OK();
}
-Status VSetOperationNode::extract_probe_column(Block& block, ColumnRawPtrs&
raw_ptrs,
- int child_id) {
- if (_probe_rows == 0) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::extract_probe_column(Block& block,
ColumnRawPtrs& raw_ptrs,
+ int child_id) {
+ if (block.rows() == 0) {
return Status::OK();
}
@@ -395,7 +574,8 @@ Status VSetOperationNode::extract_probe_column(Block&
block, ColumnRawPtrs& raw_
return Status::OK();
}
-void VSetOperationNode::create_mutable_cols(Block* output_block) {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::create_mutable_cols(Block* output_block)
{
_mutable_cols.resize(_left_table_data_types.size());
bool mem_reuse = output_block->mem_reuse();
@@ -408,7 +588,9 @@ void VSetOperationNode::create_mutable_cols(Block*
output_block) {
}
}
-void VSetOperationNode::debug_string(int indentation_level, std::stringstream*
out) const {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::debug_string(int indentation_level,
+ std::stringstream* out)
const {
*out << string(indentation_level * 2, ' ');
*out << " _child_expr_lists=[";
for (int i = 0; i < _child_expr_lists.size(); ++i) {
@@ -419,7 +601,8 @@ void VSetOperationNode::debug_string(int indentation_level,
std::stringstream* o
*out << ")" << std::endl;
}
-void VSetOperationNode::release_mem() {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::release_mem() {
_hash_table_variants = nullptr;
_arena = nullptr;
@@ -429,5 +612,113 @@ void VSetOperationNode::release_mem() {
_probe_block.clear();
}
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::refresh_hash_table() {
+ std::visit(
+ [&](auto&& arg) {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ if constexpr (std::is_same_v<typename
HashTableCtxType::Mapped,
+ RowRefListWithFlags>) {
+ HashTableCtxType tmp_hash_table;
+ bool is_need_shrink =
+
arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl);
+ if (is_need_shrink) {
+ tmp_hash_table.hash_table.init_buf_size(
+ _valid_element_in_hash_tbl /
arg.hash_table.get_factor() + 1);
+ }
+
+ arg.init_once();
+ auto& iter = arg.iter;
+ auto iter_end = arg.hash_table.end();
+ while (iter != iter_end) {
+ auto& mapped = iter->get_second();
+ auto it = mapped.begin();
+
+ if constexpr (is_intersect) { //intersected
+ if (it->visited) {
+ it->visited = false;
+ if (is_need_shrink) {
+
tmp_hash_table.hash_table.insert(iter->get_value());
+ }
+ ++iter;
+ } else {
+ if (!is_need_shrink) {
+
arg.hash_table.delete_zero_key(iter->get_first());
+ // the ++iter would check if the
current key is zero. if it does, the iterator will be moved to the container's
head.
+ // so we do ++iter before set_zero to
make the iterator move to next valid key correctly.
+ auto iter_prev = iter;
+ ++iter;
+ iter_prev->set_zero();
+ } else {
+ ++iter;
+ }
+ }
+ } else { //except
+ if (!it->visited && is_need_shrink) {
+
tmp_hash_table.hash_table.insert(iter->get_value());
+ }
+ ++iter;
+ }
+ }
+
+ arg.inited = false;
+ if (is_need_shrink) {
+ arg.hash_table =
std::move(tmp_hash_table.hash_table);
+ }
+ } else {
+ LOG(FATAL) << "FATAL: Invalid RowRefList";
+ }
+ } else {
+ LOG(FATAL) << "FATAL: uninited hash table";
+ }
+ },
+ *_hash_table_variants);
+}
+
+template <bool is_intersected>
+template <typename HashTableContext>
+Status
VSetOperationNode<is_intersected>::get_data_in_hashtable(HashTableContext&
hash_table_ctx,
+ Block*
output_block,
+ const int
batch_size, bool* eos) {
+ hash_table_ctx.init_once();
+ int left_col_len = _left_table_data_types.size();
+ auto& iter = hash_table_ctx.iter;
+ auto block_size = 0;
+
+ if constexpr (std::is_same_v<typename HashTableContext::Mapped,
RowRefListWithFlags>) {
+ for (; iter != hash_table_ctx.hash_table.end() && block_size <
batch_size; ++iter) {
+ auto& value = iter->get_second();
+ auto it = value.begin();
+ if constexpr (is_intersected) {
+ if (it->visited) { //intersected: have done probe, so visited
values it's the result
+ add_result_columns(value, block_size);
+ }
+ } else {
+ if (!it->visited) { //except: haven't visited values it's the
needed result
+ add_result_columns(value, block_size);
+ }
+ }
+ }
+ } else {
+ LOG(FATAL) << "Invalid RowRefListType!";
+ }
+
+ *eos = iter == hash_table_ctx.hash_table.end();
+ if (!output_block->mem_reuse()) {
+ for (int i = 0; i < left_col_len; ++i) {
+
output_block->insert(ColumnWithTypeAndName(std::move(_mutable_cols[i]),
+
_left_table_data_types[i], ""));
+ }
+ } else {
+ _mutable_cols.clear();
+ }
+
+ return Status::OK();
+}
+
+template class VSetOperationNode<true>;
+template class VSetOperationNode<false>;
+
} // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/exec/vset_operation_node.h
b/be/src/vec/exec/vset_operation_node.h
index b93464ba87..1e8b5e1ebe 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -31,20 +31,35 @@ namespace doris {
namespace vectorized {
-class VSetOperationNode : public ExecNode {
+template <bool is_intersect>
+class VSetOperationNode final : public ExecNode {
public:
VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
- virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
- virtual Status prepare(RuntimeState* state);
- virtual Status open(RuntimeState* state);
- virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool*
eos) {
- return Status::NotSupported("Not Implemented get RowBatch in vecorized
execution.");
+ Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
+ Status get_next(RuntimeState* /*state*/, RowBatch* /*row_batch*/, bool*
/*eos*/) override {
+ return Status::NotSupported("Not implemented get RowBatch in
vectorized execution.");
}
- virtual Status close(RuntimeState* state);
- virtual void debug_string(int indentation_level, std::stringstream* out)
const;
-protected:
+ Status get_next(RuntimeState* state, Block* output_block, bool* eos)
override;
+
+ Status close(RuntimeState* state) override;
+ void debug_string(int indentation_level, std::stringstream* out) const
override;
+
+ Status alloc_resource(RuntimeState* state) override;
+ void release_resource(RuntimeState* state) override;
+
+ Status sink(RuntimeState* state, Block* block, bool eos) override;
+ Status pull(RuntimeState* state, Block* output_block, bool* eos) override;
+
+ Status sink_probe(RuntimeState* state, int child_id, Block* block, bool
eos);
+ Status finalize_probe(RuntimeState* state, int child_id);
+
+ bool is_child_finished(int child_id) const;
+
+private:
//Todo: In build process of hashtable, It's same as join node.
//It's time to abstract out the same methods and provide them directly to
others;
void hash_table_init();
@@ -52,13 +67,17 @@ protected:
Status process_build_block(Block& block, uint8_t offset);
Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs);
Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int
child_id);
- template <bool keep_matched>
void refresh_hash_table();
- Status process_probe_block(RuntimeState* state, int child_id, bool* eos);
+
+ template <typename HashTableContext>
+ Status get_data_in_hashtable(HashTableContext& hash_table_ctx, Block*
output_block,
+ const int batch_size, bool* eos);
+
+ void add_result_columns(RowRefListWithFlags& value, int& block_size);
+
void create_mutable_cols(Block* output_block);
void release_mem();
-protected:
std::unique_ptr<HashTableVariants> _hash_table_variants;
std::vector<size_t> _probe_key_sz;
@@ -85,207 +104,21 @@ protected:
Block _probe_block;
ColumnRawPtrs _probe_columns;
std::vector<MutableColumnPtr> _mutable_cols;
- int _probe_index;
- size_t _probe_rows;
+ int _build_block_index;
+ bool _build_finished;
+ std::vector<bool> _probe_finished_children_index;
+ MutableBlock _mutable_block;
RuntimeProfile::Counter* _build_timer; // time to build hash table
RuntimeProfile::Counter* _probe_timer; // time to probe
- template <class HashTableContext>
+ template <class HashTableContext, bool is_intersected>
friend struct HashTableBuild;
template <class HashTableContext, bool is_intersected>
friend struct HashTableProbe;
};
-template <bool keep_matched>
-void VSetOperationNode::refresh_hash_table() {
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- if constexpr (std::is_same_v<typename
HashTableCtxType::Mapped,
- RowRefListWithFlags>) {
- HashTableCtxType tmp_hash_table;
- bool is_need_shrink =
-
arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl);
- if (is_need_shrink) {
- tmp_hash_table.hash_table.init_buf_size(
- _valid_element_in_hash_tbl /
arg.hash_table.get_factor() + 1);
- }
-
- arg.init_once();
- auto& iter = arg.iter;
- auto iter_end = arg.hash_table.end();
- while (iter != iter_end) {
- auto& mapped = iter->get_second();
- auto it = mapped.begin();
-
- if constexpr (keep_matched) { //intersected
- if (it->visited) {
- it->visited = false;
- if (is_need_shrink) {
-
tmp_hash_table.hash_table.insert(iter->get_value());
- }
- ++iter;
- } else {
- if (!is_need_shrink) {
-
arg.hash_table.delete_zero_key(iter->get_first());
- // the ++iter would check if the
current key is zero. if it does, the iterator will be moved to the container's
head.
- // so we do ++iter before set_zero to
make the iterator move to next valid key correctly.
- auto iter_prev = iter;
- ++iter;
- iter_prev->set_zero();
- } else {
- ++iter;
- }
- }
- } else { //except
- if (!it->visited && is_need_shrink) {
-
tmp_hash_table.hash_table.insert(iter->get_value());
- }
- ++iter;
- }
- }
-
- arg.inited = false;
- if (is_need_shrink) {
- arg.hash_table =
std::move(tmp_hash_table.hash_table);
- }
- } else {
- LOG(FATAL) << "FATAL: Invalid RowRefList";
- }
- } else {
- LOG(FATAL) << "FATAL: uninited hash table";
- }
- },
- *_hash_table_variants);
-}
-
-template <class HashTableContext, bool is_intersected>
-struct HashTableProbe {
- HashTableProbe(VSetOperationNode* operation_node, int batch_size, int
probe_rows)
- : _operation_node(operation_node),
- _left_table_data_types(operation_node->_left_table_data_types),
- _batch_size(batch_size),
- _probe_rows(probe_rows),
- _build_blocks(operation_node->_build_blocks),
- _probe_block(operation_node->_probe_block),
- _probe_index(operation_node->_probe_index),
- _num_rows_returned(operation_node->_num_rows_returned),
- _probe_raw_ptrs(operation_node->_probe_columns),
- _rows_returned_counter(operation_node->_rows_returned_counter),
- _build_col_idx(operation_node->_build_col_idx),
- _mutable_cols(operation_node->_mutable_cols) {}
-
- Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
- using KeyGetter = typename HashTableContext::State;
- using Mapped = typename HashTableContext::Mapped;
-
- KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz,
nullptr);
-
- if (_probe_index == 0) {
- _arena.reset(new Arena());
- if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
- if (_probe_keys.size() < _probe_rows) {
- _probe_keys.resize(_probe_rows);
- }
- size_t keys_size = _probe_raw_ptrs.size();
- for (size_t i = 0; i < _probe_rows; ++i) {
- _probe_keys[i] = serialize_keys_to_pool_contiguous(i,
keys_size,
-
_probe_raw_ptrs, *_arena);
- }
- }
- }
-
- if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
- key_getter.set_serialized_keys(_probe_keys.data());
- }
-
- if constexpr (std::is_same_v<typename HashTableContext::Mapped,
RowRefListWithFlags>) {
- for (; _probe_index < _probe_rows;) {
- auto find_result =
- key_getter.find_key(hash_table_ctx.hash_table,
_probe_index, *_arena);
- if (find_result.is_found()) { //if found, marked visited
- auto it = find_result.get_mapped().begin();
- if (!(it->visited)) {
- it->visited = true;
- if constexpr (is_intersected) //intersected
- _operation_node->_valid_element_in_hash_tbl++;
- else
- _operation_node->_valid_element_in_hash_tbl--;
//except
- }
- }
- _probe_index++;
- }
- } else {
- LOG(FATAL) << "Invalid RowRefListType!";
- }
- return Status::OK();
- }
-
- void add_result_columns(RowRefListWithFlags& value, int& block_size) {
- auto it = value.begin();
- for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end();
++idx) {
- auto& column =
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
- _mutable_cols[idx->second]->insert_from(column, it->row_num);
- }
- block_size++;
- }
-
- Status get_data_in_hashtable(HashTableContext& hash_table_ctx,
- std::vector<MutableColumnPtr>& mutable_cols,
Block* output_block,
- bool* eos) {
- hash_table_ctx.init_once();
- int left_col_len = _left_table_data_types.size();
- auto& iter = hash_table_ctx.iter;
- auto block_size = 0;
-
- if constexpr (std::is_same_v<typename HashTableContext::Mapped,
RowRefListWithFlags>) {
- for (; iter != hash_table_ctx.hash_table.end() && block_size <
_batch_size; ++iter) {
- auto& value = iter->get_second();
- auto it = value.begin();
- if constexpr (is_intersected) {
- if (it->visited) { //intersected: have done probe, so
visited values it's the result
- add_result_columns(value, block_size);
- }
- } else {
- if (!it->visited) { //except: haven't visited values it's
the needed result
- add_result_columns(value, block_size);
- }
- }
- }
- } else {
- LOG(FATAL) << "Invalid RowRefListType!";
- }
-
- *eos = iter == hash_table_ctx.hash_table.end();
- if (!output_block->mem_reuse()) {
- for (int i = 0; i < left_col_len; ++i) {
-
output_block->insert(ColumnWithTypeAndName(std::move(_mutable_cols[i]),
-
_left_table_data_types[i], ""));
- }
- } else {
- _mutable_cols.clear();
- }
-
- return Status::OK();
- }
-
-private:
- VSetOperationNode* _operation_node;
- const DataTypes& _left_table_data_types;
- const int _batch_size;
- const size_t _probe_rows;
- const std::vector<Block>& _build_blocks;
- const Block& _probe_block;
- int& _probe_index;
- int64_t& _num_rows_returned;
- ColumnRawPtrs& _probe_raw_ptrs;
- std::unique_ptr<Arena> _arena;
- std::vector<StringRef> _probe_keys;
- RuntimeProfile::Counter* _rows_returned_counter;
- std::unordered_map<int, int>& _build_col_idx;
- std::vector<MutableColumnPtr>& _mutable_cols;
-};
+using VIntersectNode = VSetOperationNode<true>;
+using VExceptNode = VSetOperationNode<false>;
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]