This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new 9810266ad1 [enhancement](load) enhance load from orc file (#12901)
9810266ad1 is described below
commit 9810266ad1a8af581230c8f08b280d7724ca1069
Author: wxy <[email protected]>
AuthorDate: Mon Sep 26 09:28:25 2022 +0800
[enhancement](load) enhance load from orc file (#12901)
This is part of PR 11742
Only support orc file.
On master branch, we would like to support it in new file scan framework
---
be/src/exec/orc_scanner.cpp | 76 ++++++++++++++++++++++++++++++++++++---------
be/src/exec/orc_scanner.h | 7 +++++
2 files changed, 69 insertions(+), 14 deletions(-)
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 6b3384f042..b8d2e91c4d 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -139,18 +139,6 @@ ORCScanner::~ORCScanner() {
Status ORCScanner::open() {
RETURN_IF_ERROR(BaseScanner::open());
- if (!_ranges.empty()) {
- std::list<std::string> include_cols;
- TBrokerRangeDesc range = _ranges[0];
- _num_of_columns_from_file = range.__isset.num_of_columns_from_file
- ? range.num_of_columns_from_file
- : _src_slot_descs.size();
- for (int i = 0; i < _num_of_columns_from_file; i++) {
- auto slot_desc = _src_slot_descs.at(i);
- include_cols.push_back(slot_desc->col_name());
- }
- _row_reader_options.include(include_cols);
- }
return Status::OK();
}
@@ -186,8 +174,13 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool*
tuple_pool, bool* eof, bool*
((orc::StructVectorBatch*)_batch.get())->fields;
for (int column_ipos = 0; column_ipos < _num_of_columns_from_file;
++column_ipos) {
auto slot_desc = _src_slot_descs[column_ipos];
- orc::ColumnVectorBatch* cvb =
batch_vec[_position_in_orc_original[column_ipos]];
+ if (_map_column_to_id.find(slot_desc->col_name()) ==
_map_column_to_id.end()) {
+ // if slot not exist in file, set to null
+ _src_tuple->set_null(slot_desc->null_indicator_offset());
+ continue;
+ }
+ orc::ColumnVectorBatch* cvb =
batch_vec[_position_in_orc_original[column_ipos]];
if (cvb->hasNulls && !cvb->notNull[_current_line_of_group]) {
if (!slot_desc->is_nullable()) {
std::stringstream str_error;
@@ -446,6 +439,24 @@ Status ORCScanner::open_next_reader() {
if (_reader->getNumberOfRows() == 0) {
continue;
}
+ // build map from column name to type id
+ build_name_id_map();
+ // set include names into read options
+ std::map<int, int> _include_cols_in_src_slots;
+ std::list<std::string> cols;
+ _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+ ? range.num_of_columns_from_file
+ : _src_slot_descs.size();
+ for (int i = 0; i < _num_of_columns_from_file; i++) {
+ auto slot_desc = _src_slot_descs.at(i);
+
+ // get only columns exist orc file
+ if (_map_column_to_id.find(slot_desc->col_name()) !=
_map_column_to_id.end()) {
+ _include_cols_in_src_slots[cols.size()] = i;
+ cols.push_back(slot_desc->col_name());
+ }
+ }
+ _row_reader_options.include(cols);
_total_groups = _reader->getNumberOfStripes();
_current_group = 0;
@@ -462,7 +473,9 @@ Status ORCScanner::open_next_reader() {
//include columns must in reader field, otherwise createRowReader
will throw exception
auto pos = std::find(include_cols.begin(), include_cols.end(),
_row_reader->getSelectedType().getFieldName(i));
- _position_in_orc_original.at(std::distance(include_cols.begin(),
pos)) = orc_index++;
+ _position_in_orc_original.at(
+
_include_cols_in_src_slots[std::distance(include_cols.begin(), pos)]) =
+ orc_index++;
}
return Status::OK();
}
@@ -475,4 +488,39 @@ void ORCScanner::close() {
_row_reader.reset(nullptr);
}
+void ORCScanner::build_name_id_map() {
+ _map_column_to_id.clear();
+ std::vector<std::string> columns;
+ const orc::Type& type = _reader->getType();
+ build_name_id_map_impl(columns, &type);
+}
+
+void ORCScanner::build_name_id_map_impl(std::vector<std::string>& columns,
const orc::Type* type) {
+ if (orc::STRUCT == type->getKind()) {
+ for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
+ const std::string& fieldName = type->getFieldName(i);
+ columns.push_back(fieldName);
+ _map_column_to_id[dot_column_path(columns)] =
type->getSubtype(i)->getColumnId();
+ build_name_id_map_impl(columns, type->getSubtype(i));
+ columns.pop_back();
+ }
+ } else {
+ // other non-primitive type
+ for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
+ build_name_id_map_impl(columns, type->getSubtype(j));
+ }
+ }
+}
+
+std::string ORCScanner::dot_column_path(const std::vector<std::string>&
columns) {
+ if (columns.empty()) {
+ return std::string();
+ }
+ std::ostringstream columnStream;
+ std::copy(columns.begin(), columns.end(),
+ std::ostream_iterator<std::string>(columnStream, "."));
+ std::string columnPath = columnStream.str();
+ return columnPath.substr(0, columnPath.length() - 1);
+}
+
} // namespace doris
diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h
index 86b73b6b99..ee2cd3b36d 100644
--- a/be/src/exec/orc_scanner.h
+++ b/be/src/exec/orc_scanner.h
@@ -19,6 +19,7 @@
#define ORC_SCANNER_H
#include <orc/OrcFile.hh>
+#include <orc/Type.hh>
#include "exec/base_scanner.h"
@@ -47,6 +48,11 @@ public:
private:
// Read next buffer from reader
Status open_next_reader();
+ // Generate column path
+ std::string dot_column_path(const std::vector<std::string>& columns);
+ // Build map from column name to type id
+ void build_name_id_map();
+ void build_name_id_map_impl(std::vector<std::string>& columns, const
orc::Type* type);
private:
const std::vector<TBrokerRangeDesc>& _ranges;
@@ -62,6 +68,7 @@ private:
std::shared_ptr<orc::ColumnVectorBatch> _batch;
std::unique_ptr<orc::Reader> _reader;
std::unique_ptr<orc::RowReader> _row_reader;
+ std::map<std::string, int> _map_column_to_id;
// The batch after reading from orc data is arranged in the original order,
// so we need to record the index in the original order to correspond the
column names to the order
std::vector<int> _position_in_orc_original;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]