This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d584532afda [opt](parquet) Support hive struct schema change (#32438)
d584532afda is described below
commit d584532afda3f02f71e617092448089eb37990eb
Author: 苏小刚 <[email protected]>
AuthorDate: Fri Mar 22 10:36:07 2024 +0800
[opt](parquet) Support hive struct schema change (#32438)
Followup: #31128
This optimization allows doris to correctly read struct type data after
changing the schema from hive.
## Changing struct schema in hive:
```sql
hive> create table struct_test(id int,sf struct<f1: int, f2: string>)
stored as parquet;
hive> insert into struct_test values
> (1, named_struct('f1', 1, 'f2', 's1')),
> (2, named_struct('f1', 2, 'f2', 's2')),
> (3, named_struct('f1', 3, 'f2', 's3'));
hive> alter table struct_test change sf sf struct<f1:int, f3:string>;
hive> select * from struct_test;
OK
1 {"f1":1,"f3":null}
2 {"f1":2,"f3":null}
3 {"f1":3,"f3":null}
Time taken: 5.298 seconds, Fetched: 3 row(s)
```
The previous result of doris was:
```sql
mysql> select * from struct_test;
+------+-----------------------+
| id | sf |
+------+-----------------------+
| 1 | {"f1": 1, "f3": "s1"} |
| 2 | {"f1": 2, "f3": "s2"} |
| 3 | {"f1": 3, "f3": "s3"} |
+------+-----------------------+
```
Now the result is same as hive:
```sql
mysql> select * from struct_test;
+------+-----------------------+
| id | sf |
+------+-----------------------+
| 1 | {"f1": 1, "f3": null} |
| 2 | {"f1": 2, "f3": null} |
| 3 | {"f1": 3, "f3": null} |
+------+-----------------------+
```
---
be/src/vec/data_types/data_type_struct.h | 1 +
.../exec/format/parquet/vparquet_column_reader.cpp | 59 +++++++++++++++++-----
.../exec/format/parquet/vparquet_column_reader.h | 29 ++++++++---
3 files changed, 68 insertions(+), 21 deletions(-)
diff --git a/be/src/vec/data_types/data_type_struct.h
b/be/src/vec/data_types/data_type_struct.h
index ad1a42a011d..3638b0d110a 100644
--- a/be/src/vec/data_types/data_type_struct.h
+++ b/be/src/vec/data_types/data_type_struct.h
@@ -113,6 +113,7 @@ public:
const DataTypePtr& get_element(size_t i) const { return elems[i]; }
const DataTypes& get_elements() const { return elems; }
+ const String& get_element_name(size_t i) const { return names[i]; }
const Strings& get_element_names() const { return names; }
size_t get_position_by_name(const String& name) const;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index f813d14b63b..27b377048fb 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -142,14 +142,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr
file, FieldSchema* field,
RETURN_IF_ERROR(map_reader->init(std::move(key_reader),
std::move(value_reader), field));
reader.reset(map_reader.release());
} else if (field->type.type == TYPE_STRUCT) {
- std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
+ std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
child_readers;
child_readers.reserve(field->children.size());
for (int i = 0; i < field->children.size(); ++i) {
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[i], row_group,
row_ranges, ctz, io_ctx,
child_reader, max_buf_size));
child_reader->set_nested_column();
- child_readers.emplace_back(std::move(child_reader));
+ child_readers[field->children[i].name] = std::move(child_reader);
}
auto struct_reader = StructColumnReader::create_unique(row_ranges,
ctz, io_ctx);
RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
@@ -701,8 +701,9 @@ Status MapColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr& t
return Status::OK();
}
-Status
StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>>&&
child_readers,
- FieldSchema* field) {
+Status StructColumnReader::init(
+ std::unordered_map<std::string,
std::unique_ptr<ParquetColumnReader>>&& child_readers,
+ FieldSchema* field) {
_field_schema = field;
_child_readers = std::move(child_readers);
return Status::OK();
@@ -728,19 +729,33 @@ Status StructColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
}
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
- if (_child_readers.size() != doris_struct.tuple_size()) {
- return Status::InternalError("Wrong number of struct fields");
- }
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const
DataTypeStruct*>(remove_nullable(type).get());
- for (int i = 0; i < doris_struct.tuple_size(); ++i) {
+
+ bool least_one_reader = false;
+ std::vector<size_t> missing_column_idxs {};
+
+ _read_column_names.clear();
+
+ for (size_t i = 0; i < doris_struct.tuple_size(); ++i) {
ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
- DataTypePtr& doris_type =
const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
+ auto& doris_type =
const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
+ auto& doris_name =
const_cast<String&>(doris_struct_type->get_element_name(i));
+
+ // remember the missing column index
+ if (_child_readers.find(doris_name) == _child_readers.end()) {
+ missing_column_idxs.push_back(i);
+ continue;
+ }
+
+ _read_column_names.insert(doris_name);
+
select_vector.reset();
size_t field_rows = 0;
bool field_eof = false;
- if (i == 0) {
- RETURN_IF_ERROR(_child_readers[i]->read_column_data(
+ if (!least_one_reader) {
+ least_one_reader = true;
+ RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
doris_field, doris_type, select_vector, batch_size,
&field_rows, &field_eof,
is_dict_filter));
*read_rows = field_rows;
@@ -749,7 +764,7 @@ Status StructColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
while (field_rows < *read_rows && !field_eof) {
size_t loop_rows = 0;
select_vector.reset();
- RETURN_IF_ERROR(_child_readers[i]->read_column_data(
+ RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
doris_field, doris_type, select_vector, *read_rows -
field_rows, &loop_rows,
&field_eof, is_dict_filter));
field_rows += loop_rows;
@@ -759,9 +774,25 @@ Status StructColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
}
}
+ if (!least_one_reader) {
+ // TODO: support read struct which columns are all missing
+ return Status::Corruption("Not support read struct '{}' which columns
are all missing",
+ _field_schema->name);
+ }
+
+ // fill missing column with null or default value
+ for (auto idx : missing_column_idxs) {
+ auto& doris_field = doris_struct.get_column_ptr(idx);
+ auto& doris_type =
const_cast<DataTypePtr&>(doris_struct_type->get_element(idx));
+ DCHECK(doris_type->is_nullable());
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(doris_field)).mutate().get());
+ nullable_column->insert_null_elements(*read_rows);
+ }
+
if (null_map_ptr != nullptr) {
- fill_struct_null_map(_field_schema, *null_map_ptr,
_child_readers[0]->get_rep_level(),
- _child_readers[0]->get_def_level());
+ fill_struct_null_map(_field_schema, *null_map_ptr,
this->get_rep_level(),
+ this->get_def_level());
}
return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index d15d6d5efa1..249e2d94878 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -25,6 +25,7 @@
#include <list>
#include <memory>
#include <ostream>
+#include <unordered_map>
#include <vector>
#include "io/fs/buffered_reader.h"
@@ -262,24 +263,37 @@ public:
: ParquetColumnReader(row_ranges, ctz, io_ctx) {}
~StructColumnReader() override { close(); }
- Status init(std::vector<std::unique_ptr<ParquetColumnReader>>&&
child_readers,
- FieldSchema* field);
+ Status init(
+ std::unordered_map<std::string,
std::unique_ptr<ParquetColumnReader>>&& child_readers,
+ FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
bool* eof, bool is_dict_filter) override;
const std::vector<level_t>& get_rep_level() const override {
- return _child_readers[0]->get_rep_level();
+ if (!_read_column_names.empty()) {
+ // can't use _child_readers[*_read_column_names.begin()]
+ // because the operator[] of std::unordered_map is not const :(
+ return
_child_readers.find(*_read_column_names.begin())->second->get_rep_level();
+ }
+ return _child_readers.begin()->second->get_rep_level();
}
+
const std::vector<level_t>& get_def_level() const override {
- return _child_readers[0]->get_def_level();
+ if (!_read_column_names.empty()) {
+ return
_child_readers.find(*_read_column_names.begin())->second->get_def_level();
+ }
+ return _child_readers.begin()->second->get_def_level();
}
Statistics statistics() override {
Statistics st;
for (const auto& reader : _child_readers) {
- Statistics cst = reader->statistics();
- st.merge(cst);
+ // make sure the field is read
+ if (_read_column_names.find(reader.first) !=
_read_column_names.end()) {
+ Statistics cst = reader.second->statistics();
+ st.merge(cst);
+ }
}
return st;
}
@@ -287,7 +301,8 @@ public:
void close() override {}
private:
- std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers;
+ std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_child_readers;
+ std::set<std::string> _read_column_names;
};
}; // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]