This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0d9714b179 [Fix](multi catalog)Support read hive1.x orc file. (#16677)
0d9714b179 is described below
commit 0d9714b1797f9741999898851f3c3e722d42da4b
Author: Jibing-Li <[email protected]>
AuthorDate: Tue Feb 14 14:32:27 2023 +0800
[Fix](multi catalog)Support read hive1.x orc file. (#16677)
Hive 1.x may write orc file with internal column name (_col0, _col1,
_col2...).
This will cause query result be NULL because column name in orc file
doesn't match
with column name in Doris table schema. This pr is to support query Hive
orc files with internal column names.
For now, we haven't see any problem in Parquet file, will send new pr to
fix parquet if any problem show up in the future.
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 21 +++++++++++++++-
be/src/vec/exec/format/orc/vorc_reader.h | 6 +++++
.../planner/external/ExternalFileScanNode.java | 28 ++++++++++++++++++++++
gensrc/thrift/PlanNodes.thrift | 2 ++
4 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index e5e43906d1..cf1d308242 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -81,6 +81,7 @@ OrcReader::OrcReader(RuntimeProfile* profile, const
TFileScanRangeParams& params
_range_size(range.size),
_ctz(ctz),
_column_names(column_names),
+ _is_hive(params.__isset.slot_name_to_schema_pos),
_io_ctx(io_ctx) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
_init_profile();
@@ -94,6 +95,7 @@ OrcReader::OrcReader(const TFileScanRangeParams& params,
const TFileRangeDesc& r
_scan_range(range),
_ctz(ctz),
_column_names(column_names),
+ _is_hive(params.__isset.slot_name_to_schema_pos),
_file_system(nullptr),
_io_ctx(io_ctx) {}
@@ -193,7 +195,13 @@ Status OrcReader::init_reader(
auto& selected_type = _row_reader->getSelectedType();
_col_orc_type.resize(selected_type.getSubtypeCount());
for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
- _colname_to_idx[_get_field_name_lower_case(&selected_type, i)] = i;
+ auto name = _get_field_name_lower_case(&selected_type, i);
+ // For hive engine, translate the column name in orc file to schema
column name.
+ // This is for Hive 1.x which use internal column name _col0, _col1...
+ if (_is_hive) {
+ name = _file_col_to_schema_col[name];
+ }
+ _colname_to_idx[name] = i;
_col_orc_type[i] = selected_type.getSubtype(i);
}
return Status::OK();
@@ -253,6 +261,12 @@ Status OrcReader::_init_read_columns() {
orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i));
}
for (auto& col_name : _column_names) {
+ if (_is_hive) {
+ auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
+ DCHECK(iter != _scan_params.slot_name_to_schema_pos.end());
+ int pos = iter->second;
+ orc_cols_lower_case[pos] = iter->first;
+ }
auto iter = std::find(orc_cols_lower_case.begin(),
orc_cols_lower_case.end(), col_name);
if (iter == orc_cols_lower_case.end()) {
_missing_cols.emplace_back(col_name);
@@ -260,6 +274,11 @@ Status OrcReader::_init_read_columns() {
int pos = std::distance(orc_cols_lower_case.begin(), iter);
_read_cols.emplace_back(orc_cols[pos]);
_read_cols_lower_case.emplace_back(col_name);
+ // For hive engine, store the orc column name to schema column
name map.
+ // This is for Hive 1.x orc file with internal column name _col0,
_col1...
+ if (_is_hive) {
+ _file_col_to_schema_col[orc_cols[pos]] = col_name;
+ }
}
}
return Status::OK();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 36e66b4295..2b07d48956 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -270,6 +270,12 @@ private:
std::list<std::string> _read_cols_lower_case;
std::list<std::string> _missing_cols;
std::unordered_map<std::string, int> _colname_to_idx;
+ // Column name in Orc file to column name to schema.
+ // This is used for Hive 1.x which use internal column name in Orc file.
+ // _col0, _col1...
+ std::unordered_map<std::string, std::string> _file_col_to_schema_col;
+ // Flag for hive engine. True if the external table engine is Hive.
+ bool _is_hive = false;
std::vector<const orc::Type*> _col_orc_type;
ORCFileInputStream* _file_reader = nullptr;
Statistics _statistics;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 745da3d63e..0ee81dd634 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -401,6 +401,12 @@ public class ExternalFileScanNode extends ExternalScanNode
{
createScanRangeLocations(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
+ TableIf table = desc.getTable();
+ if (table instanceof HMSExternalTable) {
+ if (((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
+ genSlotToSchemaIdMap(context);
+ }
+ }
if (scanProvider instanceof HiveScanProvider) {
this.totalPartitionNum = ((HiveScanProvider)
scanProvider).getTotalPartitionNum();
this.readPartitionNum = ((HiveScanProvider)
scanProvider).getReadPartitionNum();
@@ -420,6 +426,12 @@ public class ExternalFileScanNode extends ExternalScanNode
{
createScanRangeLocations(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
+ TableIf table = desc.getTable();
+ if (table instanceof HMSExternalTable) {
+ if (((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
+ genSlotToSchemaIdMap(context);
+ }
+ }
if (scanProvider instanceof HiveScanProvider) {
this.totalPartitionNum = ((HiveScanProvider)
scanProvider).getTotalPartitionNum();
this.readPartitionNum = ((HiveScanProvider)
scanProvider).getReadPartitionNum();
@@ -634,6 +646,22 @@ public class ExternalFileScanNode extends ExternalScanNode
{
scanProvider.createScanRangeLocations(context, backendPolicy,
scanRangeLocations);
}
+ private void genSlotToSchemaIdMap(ParamCreateContext context) {
+ List<Column> baseSchema = desc.getTable().getBaseSchema();
+ Map<String, Integer> columnNameToPosition = Maps.newHashMap();
+ for (SlotDescriptor slot : desc.getSlots()) {
+ int idx = 0;
+ for (Column col : baseSchema) {
+ if (col.getName().equals(slot.getColumn().getName())) {
+ columnNameToPosition.put(col.getName(), idx);
+ break;
+ }
+ idx += 1;
+ }
+ }
+ context.params.setSlotNameToSchemaPos(columnNameToPosition);
+ }
+
@Override
public int getNumInstances() {
return scanRangeLocations.size();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1eb2372977..6eb6a657d3 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -325,6 +325,8 @@ struct TFileScanRangeParams {
17: optional TTableFormatFileDesc table_format_params
// For csv query task, same the column index in file, order by dest_tuple
18: optional list<i32> column_idxs
+ // Map of slot to its position in table schema. Only for Hive external
table.
+ 19: optional map<string, i32> slot_name_to_schema_pos
}
struct TFileRangeDesc {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]