This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new acd5d67355 [feature-wip](new-scan)Add new odbc scanner and new odbc
scan node (#12899)
acd5d67355 is described below
commit acd5d67355d71eac97dfcf0362f959b6c5c043dc
Author: Tiewei Fang <[email protected]>
AuthorDate: Mon Sep 26 09:24:25 2022 +0800
[feature-wip](new-scan)Add new odbc scanner and new odbc scan node (#12899)
---
be/src/exec/exec_node.cpp | 10 +-
be/src/runtime/plan_fragment_executor.cpp | 4 +-
be/src/vec/CMakeLists.txt | 2 +
be/src/vec/exec/scan/new_odbc_scan_node.cpp | 62 ++++++++
be/src/vec/exec/scan/new_odbc_scan_node.h | 42 ++++++
be/src/vec/exec/scan/new_odbc_scanner.cpp | 213 ++++++++++++++++++++++++++++
be/src/vec/exec/scan/new_odbc_scanner.h | 65 +++++++++
7 files changed, 395 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index be0b8690f2..4a9d648d1c 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -62,6 +62,7 @@
#include "vec/exec/file_scan_node.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_odbc_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vanalytic_eval_node.h"
@@ -448,7 +449,11 @@ Status ExecNode::create_node(RuntimeState* state,
ObjectPool* pool, const TPlanN
#endif
case TPlanNodeType::ODBC_SCAN_NODE:
if (state->enable_vectorized_exec()) {
- *node = pool->add(new vectorized::VOdbcScanNode(pool, tnode,
descs));
+ if (config::enable_new_scan_node) {
+ *node = pool->add(new vectorized::NewOdbcScanNode(pool, tnode,
descs));
+ } else {
+ *node = pool->add(new vectorized::VOdbcScanNode(pool, tnode,
descs));
+ }
} else {
*node = pool->add(new OdbcScanNode(pool, tnode, descs));
}
@@ -725,7 +730,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
// TODO(cmy): should be removed when NewOlapScanNode is ready
ExecNode* child0 = agg_node[0]->_children[0];
if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
- typeid(*child0) == typeid(vectorized::NewFileScanNode)) {
+ typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
+ typeid(*child0) == typeid(vectorized::NewOdbcScanNode)) {
vectorized::VScanNode* scan_node =
static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
scan_node->set_no_agg_finalize();
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index ae5ee8f75a..2fe491a374 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -47,6 +47,7 @@
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_odbc_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/vexchange_node.h"
#include "vec/runtime/vdata_stream_mgr.h"
@@ -168,7 +169,8 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
// TODO(cmy): this "if...else" should be removed once all ScanNode are
derived from VScanNode.
ExecNode* node = scan_nodes[i];
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
- typeid(*node) == typeid(vectorized::NewFileScanNode)) {
+ typeid(*node) == typeid(vectorized::NewFileScanNode) ||
+ typeid(*node) == typeid(vectorized::NewOdbcScanNode)) {
vectorized::VScanNode* scan_node =
static_cast<vectorized::VScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges =
find_with_default(params.per_node_scan_ranges,
scan_node->id(), no_scan_ranges);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 06ce19c014..16eabe1e45 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -253,6 +253,8 @@ set(VEC_FILES
exec/scan/new_file_scanner.cpp
exec/scan/new_file_text_scanner.cpp
exec/scan/vfile_scanner.cpp
+ exec/scan/new_odbc_scanner.cpp
+ exec/scan/new_odbc_scan_node.cpp
)
add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp
b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
new file mode 100644
index 0000000000..48043dd22f
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_odbc_scan_node.h"
+
+#include "vec/exec/scan/new_odbc_scanner.h"
+
+static const std::string NEW_SCAN_NODE_TYPE = "NewOdbcScanNode";
+
+namespace doris::vectorized {
+
+NewOdbcScanNode::NewOdbcScanNode(ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : VScanNode(pool, tnode, descs),
+ _table_name(tnode.jdbc_scan_node.table_name),
+ _odbc_scan_node(tnode.odbc_scan_node) {
+ _output_tuple_id = tnode.odbc_scan_node.tuple_id;
+}
+
+std::string NewOdbcScanNode::get_name() {
+ return fmt::format("VNewOdbcScanNode({0})", _table_name);
+}
+
+Status NewOdbcScanNode::prepare(RuntimeState* state) {
+ VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
+ RETURN_IF_ERROR(VScanNode::prepare(state));
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ _scanner_mem_tracker = std::make_unique<MemTracker>("NewOdbcScanner");
+ return Status::OK();
+}
+
+Status NewOdbcScanNode::_init_profile() {
+ RETURN_IF_ERROR(VScanNode::_init_profile());
+ return Status::OK();
+}
+
+Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+ if (_eos == true) {
+ return Status::OK();
+ }
+ NewOdbcScanner* scanner = new NewOdbcScanner(_state, this,
_limit_per_scanner,
+ _scanner_mem_tracker.get(),
_odbc_scan_node);
+ _scanner_pool.add(scanner);
+ RETURN_IF_ERROR(scanner->prepare(_state));
+ scanners->push_back(static_cast<VScanner*>(scanner));
+ return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h
b/be/src/vec/exec/scan/new_odbc_scan_node.h
new file mode 100644
index 0000000000..40d2bdd4bd
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exec/scan/vscan_node.h"
+namespace doris::vectorized {
+class NewOdbcScanNode : public VScanNode {
+public:
+ NewOdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+
+ Status prepare(RuntimeState* state) override;
+
+ std::string get_name() override;
+
+ // no use
+ void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override {}
+
+protected:
+ Status _init_profile() override;
+ Status _init_scanners(std::list<VScanner*>* scanners) override;
+
+private:
+ std::string _table_name;
+ TOdbcScanNode _odbc_scan_node;
+ std::unique_ptr<MemTracker> _scanner_mem_tracker;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp
b/be/src/vec/exec/scan/new_odbc_scanner.cpp
new file mode 100644
index 0000000000..11943d8f04
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_odbc_scanner.h"
+
+#include "common/status.h"
+#include "exec/text_converter.hpp"
+#include "vec/exec/scan/new_odbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner";
+
+namespace doris::vectorized {
+NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent,
int64_t limit,
+ MemTracker* mem_tracker, const TOdbcScanNode&
odbc_scan_node)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+ _is_init(false),
+ _odbc_eof(false),
+ _table_name(odbc_scan_node.table_name),
+ _connect_string(odbc_scan_node.connect_string),
+ _query_string(odbc_scan_node.query_string),
+ _tuple_id(odbc_scan_node.tuple_id),
+ _tuple_desc(nullptr) {}
+
+Status NewOdbcScanner::prepare(RuntimeState* state) {
+ VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
+
+ if (_is_init) {
+ return Status::OK();
+ }
+
+ if (nullptr == state) {
+ return Status::InternalError("input pointer is null.");
+ }
+
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+ // get tuple desc
+ _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+ if (nullptr == _tuple_desc) {
+ return Status::InternalError("Failed to get tuple descriptor.");
+ }
+
+ _odbc_param.connect_string = std::move(_connect_string);
+ _odbc_param.query_string = std::move(_query_string);
+ _odbc_param.tuple_desc = _tuple_desc;
+
+ _odbc_connector.reset(new (std::nothrow) ODBCConnector(_odbc_param));
+
+ if (_odbc_connector == nullptr) {
+ return Status::InternalError("new a odbc scanner failed.");
+ }
+
+ _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+
+ if (_text_converter == nullptr) {
+ return Status::InternalError("new a text convertor failed.");
+ }
+
+ _is_init = true;
+
+ return Status::OK();
+}
+
+Status NewOdbcScanner::open(RuntimeState* state) {
+ VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open";
+
+ if (nullptr == state) {
+ return Status::InternalError("input pointer is null.");
+ }
+
+ if (!_is_init) {
+ return Status::InternalError("used before initialize.");
+ }
+
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(VScanner::open(state));
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+ RETURN_IF_ERROR(_odbc_connector->open());
+ RETURN_IF_ERROR(_odbc_connector->query());
+ // check materialize slot num
+
+ return Status::OK();
+}
+
+Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block,
bool* eof) {
+ VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl";
+
+ if (nullptr == state || nullptr == block || nullptr == eof) {
+ return Status::InternalError("input is NULL pointer");
+ }
+
+ if (!_is_init) {
+ return Status::InternalError("used before initialize.");
+ }
+ RETURN_IF_CANCELLED(state);
+
+ if (_odbc_eof == true) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ auto column_size = _tuple_desc->slots().size();
+ std::vector<MutableColumnPtr> columns(column_size);
+
+ bool mem_reuse = block->mem_reuse();
+ // only empty block should be here
+ DCHECK(block->rows() == 0);
+
+ do {
+ RETURN_IF_CANCELLED(state);
+
+ columns.resize(column_size);
+ for (auto i = 0; i < column_size; i++) {
+ if (mem_reuse) {
+ columns[i] =
std::move(*block->get_by_position(i).column).mutate();
+ } else {
+ columns[i] =
_tuple_desc->slots()[i]->get_empty_mutable_column();
+ }
+ }
+
+ for (int row_index = 0; true; row_index++) {
+ // block is full, break
+ if (state->batch_size() <= columns[0]->size()) {
+ break;
+ }
+
+ RETURN_IF_ERROR(_odbc_connector->get_next_row(&_odbc_eof));
+
+ if (_odbc_eof == true) {
+ if (block->rows() == 0) {
+ *eof = true;
+ }
+ break;
+ }
+
+ // Read one row from reader
+ for (int column_index = 0, materialized_column_index = 0;
column_index < column_size;
+ ++column_index) {
+ auto slot_desc = _tuple_desc->slots()[column_index];
+ // because the fe planner filter the non_materialize column
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ const auto& column_data =
+
_odbc_connector->get_column_data(materialized_column_index);
+
+ char* value_data =
static_cast<char*>(column_data.target_value_ptr);
+ int value_len = column_data.strlen_or_ind;
+
+ if (value_len == SQL_NULL_DATA) {
+ if (slot_desc->is_nullable()) {
+ columns[column_index]->insert_default();
+ } else {
+ return Status::InternalError(
+ "nonnull column contains nullptr. table={},
column={}", _table_name,
+ slot_desc->col_name());
+ }
+ } else if (value_len > column_data.buffer_length) {
+ return Status::InternalError(
+ "column value length longer than buffer length. "
+ "table={}, column={}, buffer_length",
+ _table_name, slot_desc->col_name(),
column_data.buffer_length);
+ } else {
+ if (!_text_converter->write_column(slot_desc,
&columns[column_index],
+ value_data, value_len,
true, false)) {
+ std::stringstream ss;
+ ss << "Fail to convert odbc value:'" << value_data <<
"' to "
+ << slot_desc->type() << " on column:`" <<
slot_desc->col_name() + "`";
+ return Status::InternalError(ss.str());
+ }
+ }
+ materialized_column_index++;
+ }
+ }
+
+ // Before really use the Block, must clear other ptr of column in block
+ // So here need do std::move and clear in `columns`
+ if (!mem_reuse) {
+ int column_index = 0;
+ for (const auto slot_desc : _tuple_desc->slots()) {
+
block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
+
slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+ } else {
+ columns.clear();
+ }
+ VLOG_ROW << "VOdbcScanNode output rows: " << block->rows();
+ } while (block->rows() == 0 && !(*eof));
+
+ return Status::OK();
+}
+
+Status NewOdbcScanner::close(RuntimeState* state) {
+ RETURN_IF_ERROR(VScanner::close(state));
+ return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h
b/be/src/vec/exec/scan/new_odbc_scanner.h
new file mode 100644
index 0000000000..34cedb8095
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scanner.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/odbc_connector.h"
+#include "exec/text_converter.h"
+#include "vec/exec/scan/new_odbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris::vectorized {
+class NewOdbcScanner : public VScanner {
+public:
+ NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
+ MemTracker* mem_tracker, const TOdbcScanNode&
odbc_scan_node);
+
+ Status open(RuntimeState* state) override;
+
+ // Close the odbc_scanner, and report errors.
+ Status close(RuntimeState* state) override;
+
+public:
+ Status prepare(RuntimeState* state);
+
+protected:
+ Status _get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
+
+private:
+ bool _is_init;
+
+ // Indicates whether there are more rows to process. Set in
_odbc_connector.next().
+ bool _odbc_eof;
+
+ std::string _table_name;
+
+ std::string _connect_string;
+
+ std::string _query_string;
+ // Tuple id resolved in prepare() to set _tuple_desc;
+ TupleId _tuple_id;
+
+ // Descriptor of tuples read from ODBC table.
+ const TupleDescriptor* _tuple_desc;
+
+ // Scanner of ODBC.
+ std::unique_ptr<ODBCConnector> _odbc_connector;
+ ODBCConnectorParam _odbc_param;
+ // Helper class for converting text to other types;
+ std::unique_ptr<TextConverter> _text_converter;
+};
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]