This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 27eed937b3 [pipelineX](es scan) Support ES scan operator (#24824)
27eed937b3 is described below
commit 27eed937b37f19e50f329a01e59759c620d4f056
Author: Gabriel <[email protected]>
AuthorDate: Sun Sep 24 00:32:38 2023 +0800
[pipelineX](es scan) Support ES scan operator (#24824)
Support ES scan operator
---
be/src/pipeline/exec/es_scan_operator.cpp | 159 +++++++++++++++++++++
be/src/pipeline/exec/es_scan_operator.h | 83 +++++++++++
be/src/pipeline/exec/scan_operator.cpp | 3 +
be/src/pipeline/pipeline_x/operator.cpp | 2 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 17 +--
be/src/vec/exec/runtime_filter_consumer.h | 3 +-
be/src/vec/exec/scan/new_es_scanner.cpp | 18 +++
be/src/vec/exec/scan/new_es_scanner.h | 5 +
.../external_table_p0/es/test_es_query.groovy | 1 -
.../es/test_es_query_nereids.groovy | 1 -
.../es/test_es_query_no_http_url.groovy | 1 -
11 files changed, 278 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp
b/be/src/pipeline/exec/es_scan_operator.cpp
new file mode 100644
index 0000000000..e4b1c4956b
--- /dev/null
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/es_scan_operator.h"
+
+#include "exec/es/es_scan_reader.h"
+#include "exec/es/es_scroll_query.h"
+#include "vec/exec/scan/new_es_scanner.h"
+
+namespace doris::pipeline {
+
+// Prefer to the local host
+static std::string get_host_and_port(const
std::vector<doris::TNetworkAddress>& es_hosts) {
+ std::string host_port;
+ std::string localhost = doris::BackendOptions::get_localhost();
+
+ doris::TNetworkAddress host = es_hosts[0];
+ for (auto& es_host : es_hosts) {
+ if (es_host.hostname == localhost) {
+ host = es_host;
+ break;
+ }
+ }
+
+ host_port = host.hostname;
+ host_port += ":";
+ host_port += std::to_string(host.port);
+ return host_port;
+}
+
+Status EsScanLocalState::_init_profile() {
+ RETURN_IF_ERROR(Base::_init_profile());
+ _es_profile.reset(new RuntimeProfile("EsIterator"));
+ Base::_scanner_profile->add_child(_es_profile.get(), true, nullptr);
+
+ _rows_read_counter = ADD_COUNTER(_es_profile, "RowsRead", TUnit::UNIT);
+ _read_timer = ADD_TIMER(_es_profile, "TotalRawReadTime(*)");
+ _materialize_timer = ADD_TIMER(_es_profile, "MaterializeTupleTime(*)");
+ return Status::OK();
+}
+
+Status EsScanLocalState::_process_conjuncts() {
+ RETURN_IF_ERROR(Base::_process_conjuncts());
+ if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+ return Status::OK();
+ }
+
+
CHECK(Base::_parent->cast<EsScanOperatorX>()._properties.find(ESScanReader::KEY_QUERY_DSL)
!=
+ Base::_parent->cast<EsScanOperatorX>()._properties.end());
+ return Status::OK();
+}
+
+Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
+ if (_scan_ranges.empty()) {
+ Base::_eos_dependency->set_ready_for_read();
+ return Status::OK();
+ }
+
+ auto& p = Base::_parent->cast<EsScanOperatorX>();
+ for (auto& es_scan_range : _scan_ranges) {
+ // Collect the information from scan range to properties
+ std::map<std::string, std::string> properties(p._properties);
+ properties[ESScanReader::KEY_INDEX] = es_scan_range->index;
+ if (es_scan_range->__isset.type) {
+ properties[ESScanReader::KEY_TYPE] = es_scan_range->type;
+ }
+ properties[ESScanReader::KEY_SHARD] =
std::to_string(es_scan_range->shard_id);
+ properties[ESScanReader::KEY_BATCH_SIZE] =
+
std::to_string(vectorized::RuntimeFilterConsumer::_state->batch_size());
+ properties[ESScanReader::KEY_HOST_PORT] =
get_host_and_port(es_scan_range->es_hosts);
+ // push down limit to Elasticsearch
+ // if predicate in _conjunct_ctxs can not be processed by
Elasticsearch, we can not push down limit operator to Elasticsearch
+ if (p.limit() != -1 &&
+ p.limit() <=
vectorized::RuntimeFilterConsumer::_state->batch_size()) {
+ properties[ESScanReader::KEY_TERMINATE_AFTER] =
std::to_string(p.limit());
+ }
+
+ bool doc_value_mode = false;
+ properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(
+ properties, p._column_names, p._docvalue_context,
&doc_value_mode);
+
+ std::shared_ptr<vectorized::NewEsScanner> scanner =
vectorized::NewEsScanner::create_shared(
+ vectorized::RuntimeFilterConsumer::_state, this,
p._limit_per_scanner, p._tuple_id,
+ properties, p._docvalue_context, doc_value_mode,
+ vectorized::RuntimeFilterConsumer::_state->runtime_profile());
+
+ RETURN_IF_ERROR(
+ scanner->prepare(vectorized::RuntimeFilterConsumer::_state,
Base::_conjuncts));
+ scanners->push_back(scanner);
+ }
+
+ return Status::OK();
+}
+
+void EsScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+ for (auto& es_scan_range : scan_ranges) {
+ DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
+ _scan_ranges.emplace_back(new
TEsScanRange(es_scan_range.scan_range.es_scan_range));
+ }
+}
+
+EsScanOperatorX::EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : ScanOperatorX<EsScanLocalState>(pool, tnode, descs),
+ _tuple_id(tnode.es_scan_node.tuple_id),
+ _tuple_desc(nullptr) {
+ ScanOperatorX<EsScanLocalState>::_output_tuple_id =
tnode.es_scan_node.tuple_id;
+}
+
+Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::init(tnode, state));
+
+ // use TEsScanNode
+ _properties = tnode.es_scan_node.properties;
+
+ if (tnode.es_scan_node.__isset.docvalue_context) {
+ _docvalue_context = tnode.es_scan_node.docvalue_context;
+ }
+
+ if (tnode.es_scan_node.__isset.fields_context) {
+ _fields_context = tnode.es_scan_node.fields_context;
+ }
+ return Status::OK();
+}
+
+Status EsScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
+
+ _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+ if (_tuple_desc == nullptr) {
+ return Status::InternalError("Failed to get tuple descriptor,
_tuple_id=i{}", _tuple_id);
+ }
+
+ // set up column name vector for ESScrollQueryBuilder
+ for (auto slot_desc : _tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ _column_names.push_back(slot_desc->col_name());
+ }
+
+ return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/es_scan_operator.h
b/be/src/pipeline/exec/es_scan_operator.h
new file mode 100644
index 0000000000..96f53c02cd
--- /dev/null
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <string>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/scan_operator.h"
+#include "pipeline/pipeline_x/operator.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris {
+class ExecNode;
+
+namespace vectorized {
+class NewOlapScanner;
+}
+} // namespace doris
+
+namespace doris::pipeline {
+
+class EsScanOperatorX;
+class EsScanLocalState final : public ScanLocalState<EsScanLocalState> {
+public:
+ using Parent = EsScanOperatorX;
+ using Base = ScanLocalState<EsScanLocalState>;
+ ENABLE_FACTORY_CREATOR(EsScanLocalState);
+ EsScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state,
parent) {}
+
+private:
+ friend class vectorized::NewOlapScanner;
+
+ void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ Status _init_profile() override;
+ Status _process_conjuncts() override;
+ Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
+
+ std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges;
+ std::unique_ptr<RuntimeProfile> _es_profile;
+ RuntimeProfile::Counter* _rows_read_counter;
+ RuntimeProfile::Counter* _read_timer;
+ RuntimeProfile::Counter* _materialize_timer;
+};
+
+class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
+public:
+ EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
+
+private:
+ friend class EsScanLocalState;
+
+ TupleId _tuple_id;
+ TupleDescriptor* _tuple_desc;
+
+ std::map<std::string, std::string> _properties;
+ std::map<std::string, std::string> _fields_context;
+ std::map<std::string, std::string> _docvalue_context;
+
+ std::vector<std::string> _column_names;
+};
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index b372e2dab8..d67228848f 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -21,6 +21,7 @@
#include <memory>
+#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/operator.h"
#include "vec/exec/runtime_filter_consumer.h"
@@ -1432,5 +1433,7 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
template class ScanOperatorX<OlapScanLocalState>;
template class ScanLocalState<OlapScanLocalState>;
+template class ScanOperatorX<EsScanLocalState>;
+template class ScanLocalState<EsScanLocalState>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 8d228623c5..2937312976 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -28,6 +28,7 @@
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/empty_set_operator.h"
+#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
@@ -393,6 +394,7 @@ DECLARE_OPERATOR_X(PartitionSortSinkLocalState)
#define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinProbeLocalState)
DECLARE_OPERATOR_X(OlapScanLocalState)
+DECLARE_OPERATOR_X(EsScanLocalState)
DECLARE_OPERATOR_X(AnalyticLocalState)
DECLARE_OPERATOR_X(SortLocalState)
DECLARE_OPERATOR_X(AggLocalState)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1f7289aced..c2becd1683 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -51,6 +51,7 @@
#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h"
#include "pipeline/exec/empty_set_operator.h"
+#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
@@ -87,16 +88,6 @@
#include "util/telemetry/telemetry.h"
#include "util/uid_util.h"
#include "vec/common/assert_cast.h"
-#include "vec/exec/join/vhash_join_node.h"
-#include "vec/exec/scan/new_es_scan_node.h"
-#include "vec/exec/scan/new_file_scan_node.h"
-#include "vec/exec/scan/new_odbc_scan_node.h"
-#include "vec/exec/scan/new_olap_scan_node.h"
-#include "vec/exec/scan/vmeta_scan_node.h"
-#include "vec/exec/scan/vscan_node.h"
-#include "vec/exec/vaggregation_node.h"
-#include "vec/exec/vexchange_node.h"
-#include "vec/exec/vunion_node.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
@@ -568,6 +559,12 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->add_operator(op));
break;
}
+ case TPlanNodeType::ES_SCAN_NODE:
+ case TPlanNodeType::ES_HTTP_SCAN_NODE: {
+ op.reset(new EsScanOperatorX(pool, tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ break;
+ }
case TPlanNodeType::EXCHANGE_NODE: {
int num_senders = find_with_default(request.per_exch_num_senders,
tnode.node_id, 0);
DCHECK_GT(num_senders, 0);
diff --git a/be/src/vec/exec/runtime_filter_consumer.h
b/be/src/vec/exec/runtime_filter_consumer.h
index a6527fae62..ed7a097901 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -63,10 +63,9 @@ protected:
std::vector<bool> _runtime_filter_ready_flag;
doris::Mutex _rf_locks;
phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
-
-private:
RuntimeState* _state;
+private:
int32_t _filter_id;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp
b/be/src/vec/exec/scan/new_es_scanner.cpp
index 0bd492b79c..867bbb67cc 100644
--- a/be/src/vec/exec/scan/new_es_scanner.cpp
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -58,6 +58,24 @@ NewEsScanner::NewEsScanner(RuntimeState* state,
NewEsScanNode* parent, int64_t l
_docvalue_context(docvalue_context),
_doc_value_mode(doc_value_mode) {}
+NewEsScanner::NewEsScanner(RuntimeState* state, pipeline::ScanLocalStateBase*
local_state,
+ int64_t limit, TupleId tuple_id,
+ const std::map<std::string, std::string>&
properties,
+ const std::map<std::string, std::string>&
docvalue_context,
+ bool doc_value_mode, RuntimeProfile* profile)
+ : VScanner(state, local_state, limit, profile),
+ _is_init(false),
+ _es_eof(false),
+ _properties(properties),
+ _line_eof(false),
+ _batch_eof(false),
+ _tuple_id(tuple_id),
+ _tuple_desc(nullptr),
+ _es_reader(nullptr),
+ _es_scroll_parser(nullptr),
+ _docvalue_context(docvalue_context),
+ _doc_value_mode(doc_value_mode) {}
+
Status NewEsScanner::prepare(RuntimeState* state, const VExprContextSPtrs&
conjuncts) {
VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts));
diff --git a/be/src/vec/exec/scan/new_es_scanner.h
b/be/src/vec/exec/scan/new_es_scanner.h
index 90b61344de..10ee1c438d 100644
--- a/be/src/vec/exec/scan/new_es_scanner.h
+++ b/be/src/vec/exec/scan/new_es_scanner.h
@@ -56,6 +56,11 @@ public:
const std::map<std::string, std::string>& docvalue_context,
bool doc_value_mode,
RuntimeProfile* profile);
+ NewEsScanner(RuntimeState* state, pipeline::ScanLocalStateBase*
local_state, int64_t limit,
+ TupleId tuple_id, const std::map<std::string, std::string>&
properties,
+ const std::map<std::string, std::string>& docvalue_context,
bool doc_value_mode,
+ RuntimeProfile* profile);
+
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
diff --git a/regression-test/suites/external_table_p0/es/test_es_query.groovy
b/regression-test/suites/external_table_p0/es/test_es_query.groovy
index a4e5d9705d..03b38cfc5f 100644
--- a/regression-test/suites/external_table_p0/es/test_es_query.groovy
+++ b/regression-test/suites/external_table_p0/es/test_es_query.groovy
@@ -16,7 +16,6 @@
// under the License.
suite("test_es_query", "p0,external,es,external_docker,external_docker_es") {
-
String enabled = context.config.otherConfigs.get("enableEsTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
diff --git
a/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy
b/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy
index a292701200..70f7678d1d 100644
--- a/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy
+++ b/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy
@@ -16,7 +16,6 @@
// under the License.
suite("test_es_query_nereids",
"p0,external,es,external_docker,external_docker_es") {
-
String enabled = context.config.otherConfigs.get("enableEsTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
diff --git
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
index 7aad2f4fba..3f592e10c6 100644
---
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
+++
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
@@ -16,7 +16,6 @@
// under the License.
suite("test_es_query_no_http_url",
"p0,external,es,external_docker,external_docker_es") {
-
String enabled = context.config.otherConfigs.get("enableEsTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]