This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0b25376cf8 [feature](torc) support insert only transactional hive
table on be side (#19518)
0b25376cf8 is described below
commit 0b25376cf8c87381bf6ddc7f795e2cf11db5ee9a
Author: Qi Chen <[email protected]>
AuthorDate: Thu May 11 14:15:09 2023 +0800
[feature](torc) support insert only transactional hive table on be side
(#19518)
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 105 +++++++++++++++++++++++++----
be/src/vec/exec/format/orc/vorc_reader.h | 9 +++
2 files changed, 100 insertions(+), 14 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 6a88360b49..eb944bb84c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -86,6 +86,14 @@ enum class FileCachePolicy : uint8_t;
namespace doris::vectorized {
+static const char* ACID_EVENT_FIELD_NAMES[] = {"operation",
"originalTransaction", "bucket",
+ "rowId",
"currentTransaction", "row"};
+
+static const char* ACID_EVENT_FIELD_NAMES_LOWER_CASE[] = {
+ "operation", "originaltransaction", "bucket", "rowid",
"currenttransaction", "row"};
+
+static const int ACID_ROW_OFFSET = 5;
+
#define FOR_FLAT_ORC_COLUMNS(M) \
M(TypeIndex::Int8, Int8, orc::LongVectorBatch) \
M(TypeIndex::UInt8, UInt8, orc::LongVectorBatch) \
@@ -245,7 +253,7 @@ Status OrcReader::init_reader(
Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
RETURN_IF_ERROR(_create_file_reader());
- auto& root_type = _reader->getType();
+ auto& root_type = _remove_acid(_reader->getType());
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i)));
@@ -257,23 +265,32 @@ Status OrcReader::_init_read_columns() {
auto& root_type = _reader->getType();
std::vector<std::string> orc_cols;
std::vector<std::string> orc_cols_lower_case;
- for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
- orc_cols.emplace_back(root_type.getFieldName(i));
-
orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i));
- }
+ _init_orc_cols(root_type, orc_cols, orc_cols_lower_case);
+
+ bool is_acid = _check_acid_schema(root_type);
for (auto& col_name : _column_names) {
if (_is_hive) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
DCHECK(iter != _scan_params.slot_name_to_schema_pos.end());
int pos = iter->second;
- orc_cols_lower_case[pos] = iter->first;
+ if (is_acid) {
+ orc_cols_lower_case[ACID_ROW_OFFSET + 1 + pos] = iter->first;
+ } else {
+ orc_cols_lower_case[pos] = iter->first;
+ }
}
auto iter = std::find(orc_cols_lower_case.begin(),
orc_cols_lower_case.end(), col_name);
if (iter == orc_cols_lower_case.end()) {
_missing_cols.emplace_back(col_name);
} else {
int pos = std::distance(orc_cols_lower_case.begin(), iter);
- _read_cols.emplace_back(orc_cols[pos]);
+ if (is_acid) {
+ auto read_col = fmt::format("{}.{}",
ACID_EVENT_FIELD_NAMES[ACID_ROW_OFFSET],
+ orc_cols[pos]);
+ _read_cols.emplace_back(read_col);
+ } else {
+ _read_cols.emplace_back(orc_cols[pos]);
+ }
_read_cols_lower_case.emplace_back(col_name);
// For hive engine, store the orc column name to schema column
name map.
// This is for Hive 1.x orc file with internal column name _col0,
_col1...
@@ -286,6 +303,44 @@ Status OrcReader::_init_read_columns() {
return Status::OK();
}
+void OrcReader::_init_orc_cols(const orc::Type& type,
std::vector<std::string>& orc_cols,
+ std::vector<std::string>& orc_cols_lower_case) {
+ for (int i = 0; i < type.getSubtypeCount(); ++i) {
+ orc_cols.emplace_back(type.getFieldName(i));
+ orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&type, i));
+ const orc::Type* sub_type = type.getSubtype(i);
+ if (sub_type->getKind() == orc::TypeKind::STRUCT) {
+ _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case);
+ }
+ }
+}
+
+bool OrcReader::_check_acid_schema(const orc::Type& type) {
+ if (orc::TypeKind::STRUCT == type.getKind()) {
+ if (type.getSubtypeCount() != std::size(ACID_EVENT_FIELD_NAMES)) {
+ return false;
+ }
+ for (uint64_t i = 0; i < type.getSubtypeCount(); ++i) {
+ const std::string& field_name = type.getFieldName(i);
+ std::string field_name_lower_case = field_name;
+ std::transform(field_name.begin(), field_name.end(),
field_name_lower_case.begin(),
+ [](unsigned char c) { return std::tolower(c); });
+ if (field_name_lower_case != ACID_EVENT_FIELD_NAMES_LOWER_CASE[i])
{
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+const orc::Type& OrcReader::_remove_acid(const orc::Type& type) {
+ if (_check_acid_schema(type)) {
+ return *(type.getSubtype(ACID_ROW_OFFSET));
+ } else {
+ return type;
+ }
+}
+
// orc only support LONG, FLOAT, STRING, DATE, DECIMAL, TIMESTAMP, BOOLEAN to
push down predicates
static std::unordered_map<orc::TypeKind, orc::PredicateDataType>
TYPEKIND_TO_PREDICATE_TYPE = {
{orc::TypeKind::BYTE, orc::PredicateDataType::LONG},
@@ -678,18 +733,27 @@ Status OrcReader::set_fill_columns(
return Status::InternalError("Failed to create orc row reader. reason
= {}", e.what());
}
auto& selected_type = _row_reader->getSelectedType();
- _col_orc_type.resize(selected_type.getSubtypeCount());
- for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
+ int idx = 0;
+ _init_select_types(selected_type, idx);
+ return Status::OK();
+}
+
+Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
+ for (int i = 0; i < type.getSubtypeCount(); ++i) {
std::string name;
// For hive engine, translate the column name in orc file to schema
column name.
// This is for Hive 1.x which use internal column name _col0, _col1...
if (_is_hive) {
- name = _file_col_to_schema_col[selected_type.getFieldName(i)];
+ name = _file_col_to_schema_col[type.getFieldName(i)];
} else {
- name = _get_field_name_lower_case(&selected_type, i);
+ name = _get_field_name_lower_case(&type, i);
+ }
+ _colname_to_idx[name] = idx++;
+ const orc::Type* sub_type = type.getSubtype(i);
+ _col_orc_type.push_back(sub_type);
+ if (sub_type->getKind() == orc::TypeKind::STRUCT) {
+ _init_select_types(*sub_type, idx);
}
- _colname_to_idx[name] = i;
- _col_orc_type[i] = selected_type.getSubtype(i);
}
return Status::OK();
}
@@ -1151,7 +1215,10 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
return Status::OK();
}
}
- const auto& batch_vec =
down_cast<orc::StructVectorBatch*>(_batch.get())->fields;
+
+ std::vector<orc::ColumnVectorBatch*> batch_vec;
+ _fill_batch_vec(batch_vec, _batch.get(), 0);
+
for (auto& col_name : _lazy_read_ctx.all_read_columns) {
auto& column_with_type_and_name = block->get_by_name(col_name);
auto& column_ptr = column_with_type_and_name.column;
@@ -1186,6 +1253,16 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
return Status::OK();
}
+void OrcReader::_fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
+ orc::ColumnVectorBatch* batch, int idx) {
+ for (auto* field : down_cast<orc::StructVectorBatch*>(batch)->fields) {
+ result.push_back(field);
+ if (_col_orc_type[idx++]->getKind() == orc::TypeKind::STRUCT) {
+ _fill_batch_vec(result, field, idx);
+ }
+ }
+}
+
Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t
size, void* arg) {
Block* block = (Block*)arg;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 690a8932d0..45e2034b07 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -149,6 +149,8 @@ public:
partition_columns,
const std::unordered_map<std::string, VExprContext*>&
missing_columns) override;
+ Status _init_select_types(const orc::Type& type, int idx);
+
Status _fill_partition_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>&
@@ -159,6 +161,9 @@ public:
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+ void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
+ orc::ColumnVectorBatch* batch, int idx);
+
void close();
int64_t size() const;
@@ -204,6 +209,10 @@ private:
void _init_profile();
Status _init_read_columns();
+ void _init_orc_cols(const orc::Type& type, std::vector<std::string>&
orc_cols,
+ std::vector<std::string>& orc_cols_lower_case);
+ static bool _check_acid_schema(const orc::Type& type);
+ static const orc::Type& _remove_acid(const orc::Type& type);
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
bool _init_search_argument(
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]