This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d80b7b9689 [feature-wip](new-scan) support more load situation (#12953)
d80b7b9689 is described below
commit d80b7b9689da6da4042ed25333a02386c9238f9b
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Sep 27 21:48:32 2022 +0800
[feature-wip](new-scan) support more load situation (#12953)
---
be/src/exec/arrow/arrow_reader.cpp | 15 +-
be/src/exec/arrow/arrow_reader.h | 2 +
be/src/exec/arrow/orc_reader.cpp | 25 +-
be/src/exec/arrow/orc_reader.h | 5 +
be/src/vec/CMakeLists.txt | 1 -
be/src/vec/columns/column_const.h | 2 +-
be/src/vec/exec/file_hdfs_scanner.cpp | 98 -------
be/src/vec/exec/file_hdfs_scanner.h | 57 ----
be/src/vec/exec/file_scan_node.cpp | 10 +-
be/src/vec/exec/format/generic_reader.h | 4 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 57 ++--
be/src/vec/exec/format/parquet/vparquet_reader.h | 16 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 300 +++++++++++++++++----
be/src/vec/exec/scan/vfile_scanner.h | 61 +++--
be/src/vec/exec/scan/vscan_node.h | 1 +
be/src/vec/exec/scan/vscanner.h | 4 -
be/src/vec/exprs/vexpr_context.cpp | 2 +-
be/src/vec/exprs/vliteral.cpp | 3 +-
be/src/vec/utils/arrow_column_to_doris_column.cpp | 56 ++++
be/src/vec/utils/arrow_column_to_doris_column.h | 4 +-
be/test/vec/exec/parquet/parquet_reader_test.cpp | 120 +--------
.../planner/external/ExternalFileScanNode.java | 74 ++++-
.../doris/planner/external/FileScanProviderIf.java | 3 +
.../doris/planner/external/HiveScanProvider.java | 6 +
.../doris/planner/external/LoadScanProvider.java | 18 +-
gensrc/thrift/PlanNodes.thrift | 11 +-
26 files changed, 535 insertions(+), 420 deletions(-)
diff --git a/be/src/exec/arrow/arrow_reader.cpp
b/be/src/exec/arrow/arrow_reader.cpp
index d26efd32aa..72d4960a43 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -79,10 +79,7 @@ Status ArrowReaderWrap::column_indices() {
if (iter != _map_column.end()) {
_include_column_ids.emplace_back(iter->second);
} else {
- std::stringstream str_error;
- str_error << "Invalid Column Name:" << slot_desc->col_name();
- LOG(WARNING) << str_error.str();
- return Status::InvalidArgument(str_error.str());
+ _missing_cols.push_back(slot_desc->col_name());
}
}
return Status::OK();
@@ -103,10 +100,13 @@ int ArrowReaderWrap::get_column_index(std::string
column_name) {
Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
size_t rows = 0;
+ bool tmp_eof = false;
do {
if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
- RETURN_IF_ERROR(next_batch(&_batch, eof));
- if (*eof) {
+ RETURN_IF_ERROR(next_batch(&_batch, &tmp_eof));
+ // We need to make sure the eof is set to true iff block is empty.
+ if (tmp_eof) {
+ *eof = (rows == 0);
return Status::OK();
}
}
@@ -128,7 +128,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block*
block, bool* eof) {
}
rows += num_elements;
_arrow_batch_cur_idx += num_elements;
- } while (!(*eof) && rows < _state->batch_size());
+ } while (!tmp_eof && rows < _state->batch_size());
return Status::OK();
}
@@ -138,7 +138,6 @@ Status
ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, b
if (_batch_eof) {
_include_column_ids.clear();
*eof = true;
- _batch_eof = false;
return Status::OK();
}
_queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 35703e4bbd..2d83a1be01 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -137,6 +137,8 @@ protected:
// The following fields are only valid when using "get_block()" interface.
std::shared_ptr<arrow::RecordBatch> _batch;
size_t _arrow_batch_cur_idx = 0;
+ // Save col names which need to be read but does not exist in file
+ std::vector<std::string> _missing_cols;
};
} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 65a67909ba..8f46a9bf21 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -26,6 +26,7 @@
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "util/string_util.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
namespace doris {
@@ -67,12 +68,11 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor*
tuple_desc,
LOG(WARNING) << "failed to read schema, errmsg=" <<
maybe_schema.status();
return Status::InternalError("Failed to create orc file reader");
}
- std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
- for (size_t i = 0; i < schema->num_fields(); ++i) {
+ _schema = maybe_schema.ValueOrDie();
+ for (size_t i = 0; i < _schema->num_fields(); ++i) {
std::string schemaName =
- _case_sensitive ? schema->field(i)->name() :
to_lower(schema->field(i)->name());
+ _case_sensitive ? _schema->field(i)->name() :
to_lower(_schema->field(i)->name());
// orc index started from 1.
-
_map_column.emplace(schemaName, i + 1);
}
RETURN_IF_ERROR(column_indices());
@@ -82,6 +82,23 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor*
tuple_desc,
return Status::OK();
}
+Status ORCReaderWrap::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ for (size_t i = 0; i < _schema->num_fields(); ++i) {
+ std::string schema_name =
+ _case_sensitive ? _schema->field(i)->name() :
to_lower(_schema->field(i)->name());
+ TypeDescriptor type;
+ RETURN_IF_ERROR(
+
vectorized::arrow_type_to_doris_type(_schema->field(i)->type()->id(), &type));
+ name_to_type->emplace(schema_name, type);
+ }
+
+ for (auto& col : _missing_cols) {
+ missing_cols->insert(col);
+ }
+ return Status::OK();
+}
+
Status ORCReaderWrap::_seek_start_stripe() {
// If file was from Hms table, _range_start_offset is started from 3(magic
word).
// And if file was from load, _range_start_offset is always set to zero.
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index a6455e8400..2d394ccf7d 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -27,6 +27,7 @@
#include "common/status.h"
#include "exec/arrow/arrow_reader.h"
+
namespace doris {
// Reader of ORC file
@@ -41,6 +42,9 @@ public:
const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) override;
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
private:
Status _next_stripe_reader(bool* eof);
Status _seek_start_stripe();
@@ -50,6 +54,7 @@ private:
private:
// orc file reader object
std::unique_ptr<arrow::adapters::orc::ORCFileReader> _reader;
+ std::shared_ptr<arrow::Schema> _schema;
bool _cur_file_eof; // is read over?
int64_t _range_start_offset;
int64_t _range_size;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 16eabe1e45..9632fa13d3 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -231,7 +231,6 @@ set(VEC_FILES
exec/file_scanner.cpp
exec/file_scan_node.cpp
exec/file_text_scanner.cpp
- exec/file_hdfs_scanner.cpp
exec/format/parquet/vparquet_column_chunk_reader.cpp
exec/format/parquet/vparquet_group_reader.cpp
exec/format/parquet/vparquet_page_index.cpp
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index f001150bee..75561ae3e8 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -184,7 +184,7 @@ public:
return false;
}
- // bool is_nullable() const override { return
is_column_nullable(*data); }
+ // bool is_nullable() const override { return is_column_nullable(*data); }
bool only_null() const override { return data->is_null_at(0); }
bool is_numeric() const override { return data->is_numeric(); }
bool is_fixed_and_contiguous() const override { return
data->is_fixed_and_contiguous(); }
diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp
b/be/src/vec/exec/file_hdfs_scanner.cpp
deleted file mode 100644
index ec891730c8..0000000000
--- a/be/src/vec/exec/file_hdfs_scanner.cpp
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "file_hdfs_scanner.h"
-
-#include "io/file_factory.h"
-
-namespace doris::vectorized {
-
-ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* state,
RuntimeProfile* profile,
- const TFileScanRangeParams&
params,
- const
std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>&
pre_filter_texprs,
- ScannerCounter* counter)
- : HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs,
counter) {}
-
-ParquetFileHdfsScanner::~ParquetFileHdfsScanner() {
- ParquetFileHdfsScanner::close();
-}
-
-Status ParquetFileHdfsScanner::open() {
- RETURN_IF_ERROR(FileScanner::open());
- if (_ranges.empty()) {
- return Status::OK();
- }
- RETURN_IF_ERROR(_get_next_reader());
- return Status::OK();
-}
-
-void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {}
-
-Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) {
- if (_scanner_eof) {
- *eof = true;
- return Status::OK();
- }
- RETURN_IF_ERROR(init_block(block));
- bool range_eof = false;
- RETURN_IF_ERROR(_reader->get_next_block(block, &range_eof));
- if (block->rows() > 0) {
- _fill_columns_from_path(block, block->rows());
- }
- if (range_eof) {
- RETURN_IF_ERROR(_get_next_reader());
- *eof = _scanner_eof;
- }
- return Status::OK();
-}
-
-Status ParquetFileHdfsScanner::_get_next_reader() {
- if (_next_range >= _ranges.size()) {
- _scanner_eof = true;
- return Status::OK();
- }
- const TFileRangeDesc& range = _ranges[_next_range++];
- std::unique_ptr<FileReader> file_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(),
_profile, _params, range,
- file_reader));
- auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
- if (tuple_desc->slots().empty()) {
- return Status::EndOfFile("No Parquet column need load");
- }
- std::vector<std::string> column_names;
- for (int i = 0; i < _file_slot_descs.size(); i++) {
- column_names.push_back(_file_slot_descs[i]->col_name());
- }
- _reader.reset(new ParquetReader(_profile, _params, range, column_names,
- _state->query_options().batch_size,
-
const_cast<cctz::time_zone*>(&_state->timezone_obj())));
- Status status = _reader->init_reader(_conjunct_ctxs);
- if (!status.ok()) {
- if (status.is_end_of_file()) {
- return _get_next_reader();
- }
- return status;
- }
- return Status::OK();
-}
-
-void ParquetFileHdfsScanner::close() {
- FileScanner::close();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_hdfs_scanner.h
b/be/src/vec/exec/file_hdfs_scanner.h
deleted file mode 100644
index b9883b88b5..0000000000
--- a/be/src/vec/exec/file_hdfs_scanner.h
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/status.h"
-#include "file_scanner.h"
-#include "vec/core/block.h"
-#include "vec/exec/format/parquet/vparquet_reader.h"
-
-namespace doris::vectorized {
-
-class HdfsFileScanner : public FileScanner {
-public:
- HdfsFileScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params, const
std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter)
- : FileScanner(state, profile, params, ranges, pre_filter_texprs,
counter) {};
-};
-
-class ParquetFileHdfsScanner : public HdfsFileScanner {
-public:
- ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter);
- ~ParquetFileHdfsScanner();
- Status open() override;
-
- Status get_next(vectorized::Block* block, bool* eof) override;
- void close() override;
-
-protected:
- void _init_profiles(RuntimeProfile* profile) override;
-
-private:
- Status _get_next_reader();
-
-private:
- std::shared_ptr<ParquetReader> _reader;
-};
-
-} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/file_scan_node.cpp
b/be/src/vec/exec/file_scan_node.cpp
index dc164f8927..3a3f9634e9 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -30,7 +30,6 @@
#include "util/thread.h"
#include "util/types.h"
#include "vec/exec/file_arrow_scanner.h"
-#include "vec/exec/file_hdfs_scanner.h"
#include "vec/exec/file_text_scanner.h"
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vexpr.h"
@@ -459,13 +458,8 @@ std::unique_ptr<FileScanner>
FileScanNode::create_scanner(const TFileScanRange&
FileScanner* scan = nullptr;
switch (scan_range.params.format_type) {
case TFileFormatType::FORMAT_PARQUET:
- if (config::parquet_reader_using_internal) {
- scan = new ParquetFileHdfsScanner(_runtime_state,
runtime_profile(), scan_range.params,
- scan_range.ranges,
_pre_filter_texprs, counter);
- } else {
- scan = new VFileParquetScanner(_runtime_state, runtime_profile(),
scan_range.params,
- scan_range.ranges,
_pre_filter_texprs, counter);
- }
+ scan = new VFileParquetScanner(_runtime_state, runtime_profile(),
scan_range.params,
+ scan_range.ranges, _pre_filter_texprs,
counter);
break;
case TFileFormatType::FORMAT_ORC:
scan = new VFileORCScanner(_runtime_state, runtime_profile(),
scan_range.params,
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index a98d678fde..d838f4dac1 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -33,6 +33,10 @@ public:
std::unordered_map<std::string, TypeDescriptor> map;
return map;
}
+ virtual Status get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>* missing_cols) {
+ return Status::NotSupported("get_columns is not implemented");
+ }
virtual ~GenericReader() {}
};
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6d5718d181..5f595fec75 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -23,11 +23,12 @@
#include "parquet_thrift_util.h"
namespace doris::vectorized {
-ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
- const TFileRangeDesc& range,
+ParquetReader::ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
+ const TFileScanRangeParams& params, const
TFileRangeDesc& range,
const std::vector<std::string>& column_names,
size_t batch_size,
cctz::time_zone* ctz)
: _profile(profile),
+ _file_reader(file_reader),
_scan_params(params),
_scan_range(range),
_batch_size(batch_size),
@@ -47,14 +48,15 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams
ParquetReader::~ParquetReader() {
close();
- if (_group_file_reader != _file_reader.get()) {
- delete _group_file_reader;
- _group_file_reader = nullptr;
- }
}
void ParquetReader::close() {
if (!_closed) {
+ if (_file_reader != nullptr) {
+ _file_reader->close();
+ delete _file_reader;
+ }
+
if (_profile != nullptr) {
COUNTER_UPDATE(_filtered_row_groups,
_statistics.filtered_row_groups);
COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups);
@@ -68,26 +70,8 @@ void ParquetReader::close() {
}
Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) {
- if (_file_reader == nullptr) {
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_scan_params, _scan_range,
- _file_reader, 2048));
- // RowGroupReader has its own underlying buffer, so we should return
file reader directly
- // If RowGroupReaders use the same file reader with ParquetReader, the
file position will change
- // when ParquetReader try to read ColumnIndex meta, which causes
performance cost
- std::unique_ptr<FileReader> group_file_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_scan_params, _scan_range,
- group_file_reader, 0));
- _group_file_reader = group_file_reader.release();
- RETURN_IF_ERROR(_group_file_reader->open());
- } else {
- // test only
- _group_file_reader = _file_reader.get();
- }
- RETURN_IF_ERROR(_file_reader->open());
- if (_file_reader->size() == 0) {
- return Status::EndOfFile("Empty Parquet File");
- }
- RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
+ CHECK(_file_reader != nullptr);
+ RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
_t_metadata = &_file_metadata->to_thrift();
_total_groups = _t_metadata->row_groups.size();
if (_total_groups == 0) {
@@ -109,6 +93,8 @@ Status ParquetReader::_init_read_columns() {
auto iter = _map_column.find(file_col_name);
if (iter != _map_column.end()) {
_include_column_ids.emplace_back(iter->second);
+ } else {
+ _missing_cols.push_back(file_col_name);
}
}
// The same order as physical columns
@@ -133,6 +119,21 @@ std::unordered_map<std::string, TypeDescriptor>
ParquetReader::get_name_to_type(
return map;
}
+Status ParquetReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ auto schema_desc = _file_metadata->schema();
+ std::unordered_set<std::string> column_names;
+ schema_desc.get_column_names(&column_names);
+ for (auto name : column_names) {
+ auto field = schema_desc.get_column(name);
+ name_to_type->emplace(name, field->type);
+ }
+ for (auto& col : _missing_cols) {
+ missing_cols->insert(col);
+ }
+ return Status::OK();
+}
+
Status ParquetReader::get_next_block(Block* block, bool* eof) {
int32_t num_of_readers = _row_group_readers.size();
DCHECK(num_of_readers <= _read_row_groups.size());
@@ -166,8 +167,8 @@ Status ParquetReader::_init_row_group_readers(const
std::vector<ExprContext*>& c
for (auto row_group_id : _read_row_groups) {
auto& row_group = _t_metadata->row_groups[row_group_id];
std::shared_ptr<RowGroupReader> row_group_reader;
- row_group_reader.reset(new RowGroupReader(_group_file_reader,
_read_columns, row_group_id,
- row_group, _ctz));
+ row_group_reader.reset(
+ new RowGroupReader(_file_reader, _read_columns, row_group_id,
row_group, _ctz));
std::vector<RowRange> candidate_row_ranges;
RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
if (candidate_row_ranges.empty()) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index c91bc08059..73848ccd48 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -70,13 +70,14 @@ private:
class ParquetReader : public GenericReader {
public:
- ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, const std::vector<std::string>&
column_names,
- size_t batch_size, cctz::time_zone* ctz);
+ ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
+ const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ const std::vector<std::string>& column_names, size_t
batch_size,
+ cctz::time_zone* ctz);
virtual ~ParquetReader();
// for test
- void set_file_reader(FileReader* file_reader) {
_file_reader.reset(file_reader); }
+ void set_file_reader(FileReader* file_reader) { _file_reader =
file_reader; }
Status init_reader(std::vector<ExprContext*>& conjunct_ctxs);
@@ -87,6 +88,8 @@ public:
int64_t size() const { return _file_reader->size(); }
std::unordered_map<std::string, TypeDescriptor> get_name_to_type()
override;
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
ParquetStatistics& statistics() { return _statistics; }
@@ -120,10 +123,10 @@ private:
private:
RuntimeProfile* _profile;
+ // file reader is passed from file scanner, and owned by this parquet
reader.
+ FileReader* _file_reader = nullptr;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
- std::unique_ptr<FileReader> _file_reader = nullptr;
- FileReader* _group_file_reader = nullptr;
std::shared_ptr<FileMetaData> _file_metadata;
const tparquet::FileMetaData* _t_metadata;
@@ -144,6 +147,7 @@ private:
std::unordered_map<int, tparquet::OffsetIndex> _col_offsets;
const std::vector<std::string> _column_names;
+ std::vector<std::string> _missing_cols;
ParquetStatistics _statistics;
bool _closed = false;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 985676eb48..ffc44775e5 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -18,11 +18,13 @@
#include "vec/exec/scan/vfile_scanner.h"
#include <fmt/format.h>
+#include <thrift/protocol/TDebugProtocol.h>
#include <vec/data_types/data_type_factory.hpp>
#include "common/logging.h"
#include "common/utils.h"
+#include "exec/arrow/orc_reader.h"
#include "exec/text_converter.hpp"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
@@ -49,6 +51,17 @@ VFileScanner::VFileScanner(RuntimeState* state,
NewFileScanNode* parent, int64_t
Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+ _get_block_timer = ADD_TIMER(_parent->_scanner_profile,
"FileScannerGetBlockTime");
+ _cast_to_input_block_timer =
+ ADD_TIMER(_parent->_scanner_profile,
"FileScannerCastInputBlockTime");
+ _fill_path_columns_timer =
+ ADD_TIMER(_parent->_scanner_profile,
"FileScannerFillPathColumnTime");
+ _fill_missing_columns_timer =
+ ADD_TIMER(_parent->_scanner_profile,
"FileScannerFillMissingColumnTime");
+ _pre_filter_timer = ADD_TIMER(_parent->_scanner_profile,
"FileScannerPreFilterTimer");
+ _convert_to_output_block_timer =
+ ADD_TIMER(_parent->_scanner_profile,
"FileScannerConvertOuputBlockTime");
+
if (vconjunct_ctx_ptr != nullptr) {
// Copy vconjunct_ctx_ptr from scan node to this scanner's
_vconjunct_ctx.
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
@@ -64,12 +77,15 @@ Status VFileScanner::prepare(VExprContext**
vconjunct_ctx_ptr) {
_pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(
_state->obj_pool(), _params.pre_filter_exprs,
_pre_conjunct_ctx_ptr.get()));
-
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state,
*_src_row_desc));
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state));
}
}
+ _default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
+
std::vector<TupleId>({_real_tuple_desc->id()}),
+ std::vector<bool>({false})));
+
return Status::OK();
}
@@ -79,6 +95,25 @@ Status VFileScanner::open(RuntimeState* state) {
return Status::OK();
}
+// For query:
+// [exist cols] [non-exist cols] [col from
path] input ouput
+// A B C D E
+// _init_src_block x x x x x
- x
+// get_next_block x x x - -
- x
+// _cast_to_input_block - - - - -
- -
+// _fill_columns_from_path - - - - x
- x
+// _fill_missing_columns - - - x -
- x
+// _convert_to_output_block - - - - -
- -
+//
+// For load:
+// [exist cols] [non-exist cols] [col from
path] input ouput
+// A B C D E
+// _init_src_block x x x x x
x -
+// get_next_block x x x - -
x -
+// _cast_to_input_block x x x - -
x -
+// _fill_columns_from_path - - - - x
x -
+// _fill_missing_columns - - - x -
x -
+// _convert_to_output_block - - - - -
- x
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
eof) {
do {
if (_cur_reader == nullptr || _cur_reader_eof) {
@@ -93,14 +128,20 @@ Status VFileScanner::_get_block_impl(RuntimeState* state,
Block* block, bool* eo
// Init src block for load job based on the data file schema (e.g.
parquet)
// For query job, simply set _src_block_ptr to block.
RETURN_IF_ERROR(_init_src_block(block));
- // Read next block.
- RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr,
&_cur_reader_eof));
+ {
+ SCOPED_TIMER(_get_block_timer);
+ // Read next block.
+ // Some of column in block may not be filled (column not exist in
file)
+ RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr,
&_cur_reader_eof));
+ }
if (_src_block_ptr->rows() > 0) {
- // Convert the src block columns type to string in place.
+ // Convert the src block columns type to string in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
// Fill rows in src block with partition columns from path. (e.g.
Hive partition columns)
RETURN_IF_ERROR(_fill_columns_from_path());
+ // Fill columns not exist in file with null or default value
+ RETURN_IF_ERROR(_fill_missing_columns());
// Apply _pre_conjunct_ctx_ptr to filter src block.
RETURN_IF_ERROR(_pre_filter_src_block());
// Convert src block to output block (dest block), string to dest
data type and apply filters.
@@ -125,13 +166,29 @@ Status VFileScanner::_init_src_block(Block* block) {
return Status::OK();
}
- _src_block.clear();
+ // if (_src_block_init) {
+ // _src_block.clear_column_data();
+ // _src_block_ptr = &_src_block;
+ // return Status::OK();
+ // }
- std::unordered_map<std::string, TypeDescriptor> name_to_type =
_cur_reader->get_name_to_type();
+ _src_block.clear();
size_t idx = 0;
+ // slots in _input_tuple_desc contains all slots describe in load
statement, eg:
+ // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
+ // _input_tuple_desc will contains: k1, k2, tmp1
+ // and some of them are from file, such as k1 and k2, and some of them may
not exist in file, such as tmp1
+ // _input_tuple_desc also contains columns from path
for (auto& slot : _input_tuple_desc->slots()) {
- DataTypePtr data_type =
-
DataTypeFactory::instance().create_data_type(name_to_type[slot->col_name()],
true);
+ DataTypePtr data_type;
+ auto it = _name_to_col_type.find(slot->col_name());
+ if (it == _name_to_col_type.end()) {
+ // not exist in file, using type from _input_tuple_desc
+ data_type =
+ DataTypeFactory::instance().create_data_type(slot->type(),
slot->is_nullable());
+ } else {
+ data_type =
DataTypeFactory::instance().create_data_type(it->second, true);
+ }
if (data_type == nullptr) {
return Status::NotSupported(fmt::format("Not support arrow
type:{}", slot->col_name()));
}
@@ -141,18 +198,20 @@ Status VFileScanner::_init_src_block(Block* block) {
_src_block_name_to_idx.emplace(slot->col_name(), idx++);
}
_src_block_ptr = &_src_block;
+ _src_block_init = true;
return Status::OK();
}
Status VFileScanner::_cast_to_input_block(Block* block) {
- if (_src_block_ptr == block) {
+ if (!_is_load) {
return Status::OK();
}
+ SCOPED_TIMER(_cast_to_input_block_timer);
// cast primitive type(PT0) to primitive type(PT1)
size_t idx = 0;
- for (size_t i = 0; i < _file_slot_descs.size(); ++i) {
- SlotDescriptor* slot_desc = _file_slot_descs[i];
- if (slot_desc == nullptr) {
+ for (auto& slot_desc : _input_tuple_desc->slots()) {
+ if (_name_to_col_type.find(slot_desc->col_name()) ==
_name_to_col_type.end()) {
+ // skip columns which does not exist in file
continue;
}
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
@@ -177,6 +236,7 @@ Status VFileScanner::_fill_columns_from_path() {
size_t rows = _src_block_ptr->rows();
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
+ SCOPED_TIMER(_fill_path_columns_timer);
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc == nullptr) continue;
auto it = _partition_slot_index_map.find(slot_desc->id());
@@ -200,11 +260,82 @@ Status VFileScanner::_fill_columns_from_path() {
return Status::OK();
}
+Status VFileScanner::_fill_missing_columns() {
+ if (_missing_cols.empty()) {
+ return Status::OK();
+ }
+
+ SCOPED_TIMER(_fill_missing_columns_timer);
+ int rows = _src_block_ptr->rows();
+ for (auto slot_desc : _real_tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
+ continue;
+ }
+
+ auto it = _col_default_value_ctx.find(slot_desc->col_name());
+ if (it == _col_default_value_ctx.end()) {
+ return Status::InternalError("failed to find default value expr
for slot: {}",
+ slot_desc->col_name());
+ }
+ if (it->second == nullptr) {
+ // no default column, fill with null
+ auto nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(
+
(*std::move(_src_block_ptr->get_by_name(slot_desc->col_name()).column))
+ .mutate()
+ .get());
+ nullable_column->insert_many_defaults(rows);
+ } else {
+ // fill with default value
+ auto* ctx = it->second;
+ auto origin_column_num = _src_block_ptr->columns();
+ int result_column_id = -1;
+ // PT1 => dest primitive type
+ RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
+ bool is_origin_column = result_column_id < origin_column_num;
+ if (!is_origin_column) {
+ auto result_column_ptr =
_src_block_ptr->get_by_position(result_column_id).column;
+ // result_column_ptr maybe a ColumnConst, convert it to a
normal column
+ result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
+ auto origin_column_type =
_src_block_ptr->get_by_name(slot_desc->col_name()).type;
+ bool is_nullable = origin_column_type->is_nullable();
+ _src_block_ptr->replace_by_position(
+
_src_block_ptr->get_position_by_name(slot_desc->col_name()),
+ is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
+ _src_block_ptr->erase(result_column_id);
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status VFileScanner::_pre_filter_src_block() {
+ if (!_is_load) {
+ return Status::OK();
+ }
+ if (_pre_conjunct_ctx_ptr) {
+ SCOPED_TIMER(_pre_filter_timer);
+ auto origin_column_num = _src_block_ptr->columns();
+ auto old_rows = _src_block_ptr->rows();
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
+ _src_block_ptr,
origin_column_num));
+ _counter.num_rows_unselected += old_rows - _src_block.rows();
+ }
+ return Status::OK();
+}
+
Status VFileScanner::_convert_to_output_block(Block* block) {
- if (_src_block_ptr == block) {
+ if (!_is_load) {
return Status::OK();
}
+ SCOPED_TIMER(_convert_to_output_block_timer);
+ // The block is passed from scanner context's free blocks,
+ // which is initialized by src columns.
+ // But for load job, the block should be filled with dest columns.
+ // So need to clear it first.
block->clear();
int ctx_idx = 0;
@@ -217,7 +348,6 @@ Status VFileScanner::_convert_to_output_block(Block* block)
{
if (!slot_desc->is_materialized()) {
continue;
}
-
int dest_index = ctx_idx++;
auto* ctx = _dest_vexpr_ctx[dest_index];
@@ -229,13 +359,15 @@ Status VFileScanner::_convert_to_output_block(Block*
block) {
is_origin_column && _src_block_mem_reuse
?
_src_block.get_by_position(result_column_id).column->clone_resized(rows)
: _src_block.get_by_position(result_column_id).column;
+ // column_ptr maybe a ColumnConst, convert it to a normal column
+ column_ptr = column_ptr->convert_to_full_column_if_const();
DCHECK(column_ptr != nullptr);
// because of src_slot_desc is always be nullable, so the column_ptr
after do dest_expr
// is likely to be nullable
if (LIKELY(column_ptr->is_nullable())) {
- auto nullable_column =
+ const ColumnNullable* nullable_column =
reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
for (int i = 0; i < rows; ++i) {
if (filter_map[i] && nullable_column->is_null_at(i)) {
@@ -280,10 +412,10 @@ Status VFileScanner::_convert_to_output_block(Block*
block) {
}
}
if (!slot_desc->is_nullable()) {
- column_ptr = nullable_column->get_nested_column_ptr();
+ column_ptr = remove_nullable(column_ptr);
}
} else if (slot_desc->is_nullable()) {
- column_ptr = vectorized::make_nullable(column_ptr);
+ column_ptr = make_nullable(column_ptr);
}
block->insert(dest_index,
vectorized::ColumnWithTypeAndName(std::move(column_ptr),
slot_desc->get_data_type_ptr(),
@@ -308,52 +440,61 @@ Status VFileScanner::_convert_to_output_block(Block*
block) {
return Status::OK();
}
-Status VFileScanner::_pre_filter_src_block() {
- if (_pre_conjunct_ctx_ptr) {
- auto origin_column_num = _src_block_ptr->columns();
- auto old_rows = _src_block_ptr->rows();
-
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
- _src_block_ptr,
origin_column_num));
- _counter.num_rows_unselected += old_rows - _src_block.rows();
- }
- return Status::OK();
-}
-
Status VFileScanner::_get_next_reader() {
- if (_cur_reader != nullptr) {
- delete _cur_reader;
- _cur_reader = nullptr;
- }
while (true) {
+ _cur_reader.reset(nullptr);
+ _src_block_init = false;
if (_next_range >= _ranges.size()) {
_scanner_eof = true;
return Status::OK();
}
const TFileRangeDesc& range = _ranges[_next_range++];
- std::vector<std::string> column_names;
+
+ // 1. create file reader
+ std::unique_ptr<FileReader> file_reader;
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(),
_profile, _params,
+ range, file_reader));
+ RETURN_IF_ERROR(file_reader->open());
+ if (file_reader->size() == 0) {
+ file_reader->close();
+ continue;
+ }
+
+ // 2. create reader for specific format
+ // TODO: add csv, json, avro
+ Status init_status;
switch (_params.format_type) {
case TFileFormatType::FORMAT_PARQUET: {
- for (int i = 0; i < _file_slot_descs.size(); i++) {
- column_names.push_back(_file_slot_descs[i]->col_name());
- }
- _cur_reader = new ParquetReader(_profile, _params, range,
column_names,
- _state->query_options().batch_size,
-
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
- Status status =
((ParquetReader*)_cur_reader)->init_reader(_conjunct_ctxs);
- if (status.ok()) {
- _cur_reader_eof = false;
- return status;
- } else if (status.is_end_of_file()) {
- continue;
- } else {
- return status;
- }
+ _cur_reader.reset(
+ new ParquetReader(_profile, file_reader.release(),
_params, range,
+ _file_col_names,
_state->query_options().batch_size,
+
const_cast<cctz::time_zone*>(&_state->timezone_obj())));
+ init_status =
((ParquetReader*)(_cur_reader.get()))->init_reader(_conjunct_ctxs);
+ break;
+ }
+ case TFileFormatType::FORMAT_ORC: {
+ _cur_reader.reset(new ORCReaderWrap(_state, _file_slot_descs,
file_reader.release(),
+ _num_of_columns_from_file,
range.start_offset,
+ range.size, false));
+ init_status =
+ ((ORCReaderWrap*)(_cur_reader.get()))
+ ->init_reader(_real_tuple_desc, _conjunct_ctxs,
_state->timezone());
+ break;
}
default:
- std::stringstream error_msg;
- error_msg << "Not supported file format " << _params.format_type;
- return Status::InternalError(error_msg.str());
+ return Status::InternalError("Not supported file format: {}",
_params.format_type);
}
+
+ if (init_status.is_end_of_file()) {
+ continue;
+ } else if (!init_status.ok()) {
+ return Status::InternalError("failed to init reader for file {},
err: {}", range.path,
+ init_status.get_error_msg());
+ }
+
+ _cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
+ _cur_reader_eof = false;
+ break;
}
return Status::OK();
}
@@ -382,6 +523,8 @@ Status VFileScanner::_init_expr_ctxes() {
_file_slot_descs.emplace_back(it->second);
auto iti = full_src_index_map.find(slot_id);
_file_slot_index_map.emplace(slot_id, iti->second);
+ _file_slot_name_map.emplace(it->second->col_name(), iti->second);
+ _file_col_names.push_back(it->second->col_name());
} else {
_partition_slot_descs.emplace_back(it->second);
auto iti = full_src_index_map.find(slot_id);
@@ -390,7 +533,9 @@ Status VFileScanner::_init_expr_ctxes() {
}
if (_is_load) {
+ // follow desc expr map and src default value expr map is only for
load task.
bool has_slot_id_map =
_params.__isset.dest_sid_to_src_sid_without_trans;
+ int idx = 0;
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
@@ -402,11 +547,15 @@ Status VFileScanner::_init_expr_ctxes() {
}
vectorized::VExprContext* ctx = nullptr;
- RETURN_IF_ERROR(
- vectorized::VExpr::create_expr_tree(_state->obj_pool(),
it->second, &ctx));
- RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
- RETURN_IF_ERROR(ctx->open(_state));
+ if (!it->second.nodes.empty()) {
+ RETURN_IF_ERROR(
+
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+ RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
+ RETURN_IF_ERROR(ctx->open(_state));
+ }
_dest_vexpr_ctx.emplace_back(ctx);
+ _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
+
if (has_slot_id_map) {
auto it1 =
_params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it1 ==
std::end(_params.dest_sid_to_src_sid_without_trans)) {
@@ -423,8 +572,49 @@ Status VFileScanner::_init_expr_ctxes() {
}
}
}
+
+ for (auto slot_desc : _real_tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ vectorized::VExprContext* ctx = nullptr;
+ auto it = _params.default_value_of_src_slot.find(slot_desc->id());
+ // if does not exist or is empty, the default value will be null
+ if (it != std::end(_params.default_value_of_src_slot) &&
!it->second.nodes.empty()) {
+ RETURN_IF_ERROR(
+
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+ RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
+ RETURN_IF_ERROR(ctx->open(_state));
+ }
+ _col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
+ }
}
return Status::OK();
}
+Status VFileScanner::close(RuntimeState* state) {
+ if (_is_closed) {
+ return Status::OK();
+ }
+
+ for (auto ctx : _dest_vexpr_ctx) {
+ if (ctx != nullptr) {
+ ctx->close(state);
+ }
+ }
+
+ for (auto it : _col_default_value_ctx) {
+ if (it.second != nullptr) {
+ it.second->close(state);
+ }
+ }
+
+ if (_pre_conjunct_ctx_ptr) {
+ (*_pre_conjunct_ctx_ptr)->close(state);
+ }
+
+ RETURN_IF_ERROR(VScanner::close(state));
+ return Status::OK();
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 4bad47795d..6608a8bfd0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -46,6 +46,8 @@ public:
Status open(RuntimeState* state) override;
+ Status close(RuntimeState* state) override;
+
public:
Status prepare(VExprContext** vconjunct_ctx_ptr);
@@ -65,24 +67,52 @@ protected:
const std::vector<TFileRangeDesc>& _ranges;
int _next_range;
- GenericReader* _cur_reader;
+ std::unique_ptr<GenericReader> _cur_reader;
bool _cur_reader_eof;
// File source slot descriptors
std::vector<SlotDescriptor*> _file_slot_descs;
- // File slot id to index map.
+ // File slot id to index in _file_slot_descs
std::map<SlotId, int> _file_slot_index_map;
+ // file col name to index in _file_slot_descs
+ std::map<std::string, int> _file_slot_name_map;
+ // col names from _file_slot_descs
+ std::vector<std::string> _file_col_names;
// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
- // Partition slot id to index map
+ // Partition slot id to index in _partition_slot_descs
std::map<SlotId, int> _partition_slot_index_map;
+ // created from param.expr_of_dest_slot
+ // For query, it saves default value expr of all dest columns, or nullptr
for NULL.
+ // For load, it saves convertion expr/default value of all dest columns.
+ std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
+ // dest slot name to index in _dest_vexpr_ctx;
+ std::unordered_map<std::string, int> _dest_slot_name_to_idx;
+ // col name to default value expr
+ std::unordered_map<std::string, vectorized::VExprContext*>
_col_default_value_ctx;
+ // the map values of dest slot id to src slot desc
+ // if there is not key of dest slot id in
dest_sid_to_src_sid_without_trans, it will be set to nullptr
+ std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
+ // dest slot desc index to src slot desc index
+ std::unordered_map<int, int> _dest_slot_to_src_slot_index;
+
+ std::unordered_map<std::string, size_t> _src_block_name_to_idx;
+
+ // Get from GenericReader, save the existing columns in file to their type.
+ std::unordered_map<std::string, TypeDescriptor> _name_to_col_type;
+ // Get from GenericReader, save columns that requried by scan but not
exist in file.
+ // These columns will be filled by default value or null.
+ std::unordered_set<std::string> _missing_cols;
+
+ // For load task
+ std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr;
+ std::unique_ptr<RowDescriptor> _src_row_desc;
+ // row desc for default exprs
+ std::unique_ptr<RowDescriptor> _default_val_row_desc;
// Mem pool used to allocate _src_tuple and _src_tuple_row
std::unique_ptr<MemPool> _mem_pool;
- // Dest tuple descriptor and dest expr context
- const TupleDescriptor* _dest_tuple_desc;
-
// Profile
RuntimeProfile* _profile;
ScannerCounter _counter;
@@ -91,22 +121,20 @@ protected:
int _rows = 0;
int _num_of_columns_from_file;
- std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
-
- // the map values of dest slot id to src slot desc
- // if there is not key of dest slot id in
dest_sid_to_src_sid_without_trans, it will be set to nullptr
- std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
-
bool _src_block_mem_reuse = false;
bool _strict_mode;
+ bool _src_block_init = false;
Block* _src_block_ptr;
Block _src_block;
- // dest slot desc index to src slot desc index
- std::unordered_map<int, int> _dest_slot_to_src_slot_index;
-
- std::unordered_map<std::string, size_t> _src_block_name_to_idx;
+private:
+ RuntimeProfile::Counter* _get_block_timer = nullptr;
+ RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
+ RuntimeProfile::Counter* _fill_path_columns_timer = nullptr;
+ RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
+ RuntimeProfile::Counter* _pre_filter_timer = nullptr;
+ RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
private:
Status _init_expr_ctxes();
@@ -114,6 +142,7 @@ private:
Status _cast_to_input_block(Block* block);
Status _pre_filter_src_block();
Status _convert_to_output_block(Block* block);
+ Status _fill_missing_columns();
void _reset_counter() {
_counter.num_rows_unselected = 0;
_counter.num_rows_filtered = 0;
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 6117d487bc..fbcf248a3c 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -37,6 +37,7 @@ public:
: ExecNode(pool, tnode, descs),
_runtime_filter_descs(tnode.runtime_filters) {}
friend class VScanner;
friend class NewOlapScanner;
+ friend class VFileScanner;
friend class ScannerContext;
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index aff7a3a4ed..07313428df 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -137,10 +137,6 @@ protected:
// and will be destroyed at the end.
std::vector<VExprContext*> _stale_vexpr_ctxs;
- // For load scanner
- std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr;
- std::unique_ptr<RowDescriptor> _src_row_desc;
-
// num of rows read from scanner
int64_t _num_rows_read = 0;
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index 019a7d802c..ccb1045cb1 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -32,7 +32,7 @@ VExprContext::VExprContext(VExpr* expr)
_stale(false) {}
VExprContext::~VExprContext() {
- DCHECK(!_prepared || _closed);
+ DCHECK(!_prepared || _closed) << get_stack_trace();
for (int i = 0; i < _fn_contexts.size(); ++i) {
delete _fn_contexts[i];
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index d87278b347..d99cdbdd97 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -194,7 +194,8 @@ Status VLiteral::execute(VExprContext* context,
vectorized::Block* block, int* r
std::string VLiteral::debug_string() const {
std::stringstream out;
- out << "VLiteral (type = " << _data_type->get_name();
+ out << "VLiteral (name = " << _expr_name;
+ out << ", type = " << _data_type->get_name();
out << ", value = ";
if (_column_ptr.get()->size() > 0) {
StringRef ref = _column_ptr.get()->get_data_at(0);
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp
b/be/src/vec/utils/arrow_column_to_doris_column.cpp
index eacd1136b5..e71bf970a0 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.cpp
+++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp
@@ -408,4 +408,60 @@ Status arrow_column_to_doris_column(const arrow::Array*
arrow_column, size_t arr
return Status::NotSupported(
fmt::format("Not support arrow type:{}",
arrow_column->type()->name()));
}
+
+Status arrow_type_to_doris_type(arrow::Type::type type, TypeDescriptor*
return_type) {
+ switch (type) {
+ case arrow::Type::STRING:
+ case arrow::Type::BINARY:
+ case arrow::Type::FIXED_SIZE_BINARY:
+ return_type->type = TYPE_STRING;
+ break;
+ case arrow::Type::INT8:
+ return_type->type = TYPE_TINYINT;
+ break;
+ case arrow::Type::UINT8:
+ case arrow::Type::INT16:
+ return_type->type = TYPE_SMALLINT;
+ break;
+ case arrow::Type::UINT16:
+ case arrow::Type::INT32:
+ return_type->type = TYPE_INT;
+ break;
+ case arrow::Type::UINT32:
+ case arrow::Type::INT64:
+ return_type->type = TYPE_BIGINT;
+ break;
+ case arrow::Type::UINT64:
+ return_type->type = TYPE_LARGEINT;
+ break;
+ case arrow::Type::HALF_FLOAT:
+ case arrow::Type::FLOAT:
+ return_type->type = TYPE_FLOAT;
+ break;
+ case arrow::Type::DOUBLE:
+ return_type->type = TYPE_DOUBLE;
+ break;
+ case arrow::Type::BOOL:
+ return_type->type = TYPE_BOOLEAN;
+ break;
+ case arrow::Type::DATE32:
+ return_type->type = TYPE_DATEV2;
+ break;
+ case arrow::Type::DATE64:
+ return_type->type = TYPE_DATETIMEV2;
+ break;
+ case arrow::Type::TIMESTAMP:
+ return_type->type = TYPE_BIGINT;
+ break;
+ case arrow::Type::DECIMAL:
+ return_type->type = TYPE_DECIMALV2;
+ return_type->precision = 27;
+ return_type->scale = 9;
+ break;
+ default:
+ return Status::InternalError("unsupport type: {}", type);
+ }
+ return Status::OK();
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.h
b/be/src/vec/utils/arrow_column_to_doris_column.h
index 9d5f077672..13edffadae 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.h
+++ b/be/src/vec/utils/arrow_column_to_doris_column.h
@@ -24,7 +24,7 @@
#include <memory>
#include "common/status.h"
-#include "runtime/primitive_type.h"
+#include "runtime/types.h"
#include "vec/core/column_with_type_and_name.h"
// This files contains some utilities to convert Doris internal
@@ -42,4 +42,6 @@ Status arrow_column_to_doris_column(const arrow::Array*
arrow_column, size_t arr
ColumnPtr& doris_column, const
DataTypePtr& type,
size_t num_elements, const
cctz::time_zone& ctz);
+Status arrow_type_to_doris_type(arrow::Type::type type, TypeDescriptor*
return_type);
+
} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 23bf6b353f..e8d3339b43 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -22,7 +22,6 @@
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/data_types/data_type_factory.hpp"
-#include "vec/exec/file_hdfs_scanner.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
namespace doris {
@@ -92,6 +91,7 @@ TEST_F(ParquetReaderTest, normal) {
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
LocalFileReader* reader =
new
LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
0);
+ reader->open();
cctz::time_zone ctz;
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
@@ -106,8 +106,8 @@ TEST_F(ParquetReaderTest, normal) {
scan_range.start_offset = 0;
scan_range.size = 1000;
}
- auto p_reader = new ParquetReader(nullptr, scan_params, scan_range,
column_names, 992, &ctz);
- p_reader->set_file_reader(reader);
+ auto p_reader =
+ new ParquetReader(nullptr, reader, scan_params, scan_range,
column_names, 992, &ctz);
RuntimeState runtime_state((TQueryGlobals()));
runtime_state.set_desc_tbl(desc_tbl);
runtime_state.init_instance_mem_tracker();
@@ -132,119 +132,5 @@ TEST_F(ParquetReaderTest, normal) {
delete p_reader;
}
-TEST_F(ParquetReaderTest, scanner) {
- TDescriptorTable t_desc_table;
- TTableDescriptor t_table_desc;
-
- t_table_desc.id = 0;
- t_table_desc.tableType = TTableType::OLAP_TABLE;
- t_table_desc.numCols = 7;
- t_table_desc.numClusteringCols = 0;
- t_desc_table.tableDescriptors.push_back(t_table_desc);
- t_desc_table.__isset.tableDescriptors = true;
-
- // init boolean and numeric slot
- std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col",
"smallint_col",
- "int_col", "bigint_col",
"float_col",
- "double_col"};
- for (int i = 0; i < numeric_types.size(); i++) {
- TSlotDescriptor tslot_desc;
- {
- tslot_desc.id = i;
- tslot_desc.parent = 0;
- TTypeDesc type;
- {
- TTypeNode node;
- node.__set_type(TTypeNodeType::SCALAR);
- TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::type(i + 2));
- node.__set_scalar_type(scalar_type);
- type.types.push_back(node);
- }
- tslot_desc.slotType = type;
- tslot_desc.columnPos = 0;
- tslot_desc.byteOffset = 0;
- tslot_desc.nullIndicatorByte = 1;
- tslot_desc.nullIndicatorBit = 1;
- tslot_desc.colName = numeric_types[i];
- tslot_desc.slotIdx = 0;
- tslot_desc.isMaterialized = true;
- t_desc_table.slotDescriptors.push_back(tslot_desc);
- }
- }
-
- t_desc_table.__isset.slotDescriptors = true;
- {
- TTupleDescriptor t_tuple_desc;
- t_tuple_desc.id = 0;
- t_tuple_desc.byteSize = 16;
- t_tuple_desc.numNullBytes = 0;
- t_tuple_desc.tableId = 0;
- t_tuple_desc.__isset.tableId = true;
- t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
- }
-
- // set scan range
- // std::vector<TScanRangeParams> scan_ranges;
- TFileScanRange file_scan_range;
- {
- // TScanRangeParams scan_range_params;
- // TFileScanRange file_scan_range;
- TFileScanRangeParams params;
- {
- params.__set_src_tuple_id(0);
- params.__set_num_of_columns_from_file(7);
- params.file_type = TFileType::FILE_LOCAL;
- params.format_type = TFileFormatType::FORMAT_PARQUET;
- std::vector<TFileScanSlotInfo> file_slots;
- for (int i = 0; i < numeric_types.size(); i++) {
- TFileScanSlotInfo slot_info;
- slot_info.slot_id = i;
- slot_info.is_file_slot = true;
- file_slots.emplace_back(slot_info);
- }
- params.__set_required_slots(file_slots);
- }
- file_scan_range.params = params;
- TFileRangeDesc range;
- {
- range.start_offset = 0;
- range.size = 1000;
- range.path =
"./be/test/exec/test_data/parquet_scanner/type-decoder.parquet";
- std::vector<std::string> columns_from_path {"value"};
- range.__set_columns_from_path(columns_from_path);
- }
- file_scan_range.ranges.push_back(range);
- //
scan_range_params.scan_range.ext_scan_range.__set_file_scan_range(broker_scan_range);
- // scan_ranges.push_back(scan_range_params);
- }
-
- std::vector<TExpr> pre_filter_texprs = std::vector<TExpr>();
- RuntimeState runtime_state((TQueryGlobals()));
- runtime_state.init_instance_mem_tracker();
-
- DescriptorTbl* desc_tbl;
- ObjectPool obj_pool;
- DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
- runtime_state.set_desc_tbl(desc_tbl);
- ScannerCounter counter;
- std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
- auto scan = new ParquetFileHdfsScanner(&runtime_state,
runtime_state.runtime_profile(),
- file_scan_range.params,
file_scan_range.ranges,
- pre_filter_texprs, &counter);
- scan->reg_conjunct_ctxs(0, conjunct_ctxs);
- Status st = scan->open();
- EXPECT_TRUE(st.ok());
-
- bool eof = false;
- Block* block = new Block();
- scan->get_next(block, &eof);
- for (auto& col : block->get_columns_with_type_and_name()) {
- ASSERT_EQ(col.column->size(), 10);
- }
- delete block;
- delete scan;
-}
-
} // namespace vectorized
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index da9fddc342..7500c73899 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -45,6 +46,7 @@ import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileScanNode;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TPlanNode;
@@ -72,16 +74,23 @@ public class ExternalFileScanNode extends ExternalScanNode {
public List<Expr> conjuncts;
public TupleDescriptor destTupleDescriptor;
-
+ public Map<String, SlotDescriptor> destSlotDescByName;
// === Set when init ===
public TupleDescriptor srcTupleDescriptor;
+ public Map<String, SlotDescriptor> srcSlotDescByName;
public Map<String, Expr> exprMap;
- public Map<String, SlotDescriptor> slotDescByName;
public String timezone;
// === Set when init ===
public TFileScanRangeParams params;
+ public void createDestSlotMap() {
+ Preconditions.checkNotNull(destTupleDescriptor);
+ destSlotDescByName = Maps.newHashMap();
+ for (SlotDescriptor slot : destTupleDescriptor.getSlots()) {
+ destSlotDescByName.put(slot.getColumn().getName(), slot);
+ }
+ }
}
public enum Type {
@@ -169,7 +178,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
break;
case LOAD:
for (FileGroupInfo fileGroupInfo : fileGroupInfos) {
- this.scanProviders.add(new
LoadScanProvider(fileGroupInfo));
+ this.scanProviders.add(new LoadScanProvider(fileGroupInfo,
desc));
}
break;
default:
@@ -186,6 +195,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
private void initParamCreateContexts(Analyzer analyzer) throws
UserException {
for (FileScanProviderIf scanProvider : scanProviders) {
ParamCreateContext context = scanProvider.createContext(analyzer);
+ context.createDestSlotMap();
// set where and preceding filter.
// FIXME(cmy): we should support set different expr for different
file group.
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(),
context.srcTupleDescriptor, analyzer);
@@ -255,20 +265,72 @@ public class ExternalFileScanNode extends
ExternalScanNode {
contexts.size() + " vs. " + scanProviders.size());
for (int i = 0; i < contexts.size(); ++i) {
ParamCreateContext context = contexts.get(i);
- finalizeParamsForLoad(context, analyzer);
FileScanProviderIf scanProvider = scanProviders.get(i);
+ setDefaultValueExprs(scanProvider, context);
+ finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
}
}
+ protected void setDefaultValueExprs(FileScanProviderIf scanProvider,
ParamCreateContext context)
+ throws UserException {
+ TableIf tbl = scanProvider.getTargetTable();
+ Preconditions.checkNotNull(tbl);
+ TExpr tExpr = new TExpr();
+ tExpr.setNodes(Lists.newArrayList());
+
+ for (Column column : tbl.getBaseSchema()) {
+ Expr expr;
+ if (column.getDefaultValue() != null) {
+ if (column.getDefaultValueExprDef() != null) {
+ expr = column.getDefaultValueExpr();
+ } else {
+ expr = new StringLiteral(column.getDefaultValue());
+ }
+ } else {
+ if (column.isAllowNull()) {
+ expr =
NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR);
+ } else {
+ expr = null;
+ }
+ }
+ SlotDescriptor slotDesc = null;
+ switch (type) {
+ case LOAD: {
+ slotDesc = context.srcSlotDescByName.get(column.getName());
+ break;
+ }
+ case QUERY: {
+ slotDesc =
context.destSlotDescByName.get(column.getName());
+ break;
+ }
+ default:
+ Preconditions.checkState(false, type);
+ }
+ // if slot desc is null, which mean it is a unrelated slot, just
skip.
+ // eg:
+ // (a, b, c) set (x=a, y=b, z=c)
+ // c does not exist in file, the z will be filled with null, even
if z has default value.
+ // and if z is not nullable, the load will fail.
+ if (slotDesc != null) {
+ if (expr != null) {
+ expr = castToSlot(slotDesc, expr);
+
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(),
expr.treeToThrift());
+ } else {
+
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr);
+ }
+ }
+ }
+ }
+
protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer
analyzer) throws UserException {
if (type != Type.LOAD) {
context.params.setSrcTupleId(-1);
return;
}
- Map<String, SlotDescriptor> slotDescByName = context.slotDescByName;
+ Map<String, SlotDescriptor> slotDescByName = context.srcSlotDescByName;
Map<String, Expr> exprMap = context.exprMap;
TupleDescriptor srcTupleDesc = context.srcTupleDescriptor;
boolean negative = context.fileGroup.isNegative();
@@ -426,3 +488,5 @@ public class ExternalFileScanNode extends ExternalScanNode {
}
}
+
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
index 700d8be098..8ae7952169 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
@@ -19,6 +19,7 @@ package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
@@ -56,4 +57,6 @@ public interface FileScanProviderIf {
int getInputSplitNum();
long getInputFileSize();
+
+ TableIf getTargetTable();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 7965a02711..1df3d639bc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HiveBucketUtil;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@@ -86,6 +87,11 @@ public class HiveScanProvider implements
HMSTableScanProviderIf {
this.desc = desc;
}
+ @Override
+ public TableIf getTargetTable() {
+ return hmsTable;
+ }
+
@Override
public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
TFileFormatType type = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 33b0db2de7..d202ead466 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -22,7 +22,9 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
@@ -55,10 +57,12 @@ import java.util.Map;
public class LoadScanProvider implements FileScanProviderIf {
- FileGroupInfo fileGroupInfo;
+ private FileGroupInfo fileGroupInfo;
+ private TupleDescriptor destTupleDesc;
- public LoadScanProvider(FileGroupInfo fileGroupInfo) {
+ public LoadScanProvider(FileGroupInfo fileGroupInfo, TupleDescriptor
destTupleDesc) {
this.fileGroupInfo = fileGroupInfo;
+ this.destTupleDesc = destTupleDesc;
}
@Override
@@ -89,6 +93,7 @@ public class LoadScanProvider implements FileScanProviderIf {
@Override
public ParamCreateContext createContext(Analyzer analyzer) throws
UserException {
ParamCreateContext ctx = new ParamCreateContext();
+ ctx.destTupleDescriptor = destTupleDesc;
ctx.fileGroup = fileGroupInfo.getFileGroup();
ctx.timezone = analyzer.getTimezone();
@@ -169,7 +174,7 @@ public class LoadScanProvider implements FileScanProviderIf
{
*/
private void initColumns(ParamCreateContext context, Analyzer analyzer)
throws UserException {
context.srcTupleDescriptor =
analyzer.getDescTbl().createTupleDescriptor();
- context.slotDescByName =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ context.srcSlotDescByName =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
// for load job, column exprs is got from file group
@@ -190,7 +195,7 @@ public class LoadScanProvider implements FileScanProviderIf
{
}
List<Integer> srcSlotIds = Lists.newArrayList();
Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs,
context.fileGroup.getColumnToHadoopFunction(),
- context.exprMap, analyzer, context.srcTupleDescriptor,
context.slotDescByName, srcSlotIds,
+ context.exprMap, analyzer, context.srcTupleDescriptor,
context.srcSlotDescByName, srcSlotIds,
formatType(context.fileGroup.getFileFormat(), ""), null,
VectorizedUtil.isVectorized());
int columnCountFromPath = 0;
@@ -247,4 +252,9 @@ public class LoadScanProvider implements FileScanProviderIf
{
return TFileFormatType.FORMAT_CSV_PLAIN;
}
}
+
+ @Override
+ public TableIf getTargetTable() {
+ return fileGroupInfo.getTargetTable();
+ }
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 4135099c8b..0a4529572b 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -261,17 +261,18 @@ struct TFileScanRangeParams {
// The convert exprt map for load job
// desc slot id -> expr
9: optional map<Types.TSlotId, Exprs.TExpr> expr_of_dest_slot
+ 10: optional map<Types.TSlotId, Exprs.TExpr> default_value_of_src_slot
// This is the mapping of dest slot id and src slot id in load expr
// It excludes the slot id which has the transform expr
- 10: optional map<Types.TSlotId, Types.TSlotId>
dest_sid_to_src_sid_without_trans
+ 11: optional map<Types.TSlotId, Types.TSlotId>
dest_sid_to_src_sid_without_trans
// strictMode is a boolean
// if strict mode is true, the incorrect data (the result of cast is null)
will not be loaded
- 11: optional bool strict_mode
+ 12: optional bool strict_mode
- 12: optional list<Types.TNetworkAddress> broker_addresses
- 13: optional TFileAttributes file_attributes
- 14: optional Exprs.TExpr pre_filter_exprs
+ 13: optional list<Types.TNetworkAddress> broker_addresses
+ 14: optional TFileAttributes file_attributes
+ 15: optional Exprs.TExpr pre_filter_exprs
}
struct TFileRangeDesc {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]