This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8caa5a9ba4 [Fix](mutli-catalog) Fix null partitions error in iceberg
tables. (#22185)
8caa5a9ba4 is described below
commit 8caa5a9ba45febf662ffc6efc14039086f3185f5
Author: Qi Chen <[email protected]>
AuthorDate: Thu Jul 27 23:57:35 2023 +0800
[Fix](mutli-catalog) Fix null partitions error in iceberg tables. (#22185)
### Issue
when partition has null partitions, it throws error
`Failed to fill partition column: t_int=null`
### Resolution
- Fix the following null partitions error in iceberg tables by replacing
null partition to '\N'.
- Add regression test for hive null partition.
---
be/src/exec/text_converter.h | 2 +
be/src/vec/exec/scan/vfile_scanner.cpp | 79 +++++++++-------------
be/src/vec/exec/scan/vfile_scanner.h | 4 ++
.../hive/test_external_catalog_hive_partition.out | 73 ++++++++++++++++++++
.../test_external_catalog_iceberg_partition.out | 73 ++++++++++++++++++++
.../test_external_catalog_hive_partition.groovy | 67 ++++++++++++++++++
.../test_external_catalog_iceberg_partition.groovy | 63 +++++++++++++++++
7 files changed, 313 insertions(+), 48 deletions(-)
diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h
index b46594fd04..083c7c6881 100644
--- a/be/src/exec/text_converter.h
+++ b/be/src/exec/text_converter.h
@@ -29,6 +29,8 @@ class SlotDescriptor;
// numeric types, etc.
class TextConverter {
public:
+ static constexpr char NULL_STR[3] = {'\\', 'N', '\0'};
+
TextConverter(char escape_char, char array_delimiter = '\2');
void write_string_column(const SlotDescriptor* slot_desc,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index eed73e98cb..4413f3deac 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -356,27 +356,14 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
}
Status VFileScanner::_fill_columns_from_path(size_t rows) {
- const TFileRangeDesc& range = _ranges.at(_next_range - 1);
- if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
- SCOPED_TIMER(_fill_path_columns_timer);
- for (const auto& slot_desc : _partition_slot_descs) {
- if (slot_desc == nullptr) continue;
- auto it = _partition_slot_index_map.find(slot_desc->id());
- if (it == std::end(_partition_slot_index_map)) {
- std::stringstream ss;
- ss << "Unknown source slot descriptor, slot_id=" <<
slot_desc->id();
- return Status::InternalError(ss.str());
- }
- const std::string& column_from_path =
range.columns_from_path[it->second];
- auto doris_column =
_src_block_ptr->get_by_name(slot_desc->col_name()).column;
- IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-
- if (!_text_converter->write_vec_column(slot_desc, col_ptr,
-
const_cast<char*>(column_from_path.c_str()),
- column_from_path.size(),
true, false, rows)) {
- return Status::InternalError("Failed to fill partition column:
{}={}",
- slot_desc->col_name(),
column_from_path);
- }
+ for (auto& kv : *_partition_columns) {
+ auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
+ IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
+ auto& [value, slot_desc] = kv.second;
+ if (!_text_converter->write_vec_column(slot_desc, col_ptr,
const_cast<char*>(value.c_str()),
+ value.size(), true, false,
rows)) {
+ return Status::InternalError("Failed to fill partition column:
{}={}",
+ slot_desc->col_name(), value);
}
}
return Status::OK();
@@ -388,29 +375,15 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
}
SCOPED_TIMER(_fill_missing_columns_timer);
- for (auto slot_desc : _real_tuple_desc->slots()) {
- if (!slot_desc->is_materialized()) {
- continue;
- }
- if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
- continue;
- }
-
- auto it = _col_default_value_ctx.find(slot_desc->col_name());
- if (it == _col_default_value_ctx.end()) {
- return Status::InternalError("failed to find default value expr
for slot: {}",
- slot_desc->col_name());
- }
- if (it->second == nullptr) {
+ for (auto& kv : *_missing_columns) {
+ if (kv.second == nullptr) {
// no default column, fill with null
auto nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(
-
(*std::move(_src_block_ptr->get_by_name(slot_desc->col_name()).column))
- .mutate()
- .get());
+
(*std::move(_src_block_ptr->get_by_name(kv.first).column)).mutate().get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
- auto& ctx = it->second;
+ auto& ctx = kv.second;
auto origin_column_num = _src_block_ptr->columns();
int result_column_id = -1;
// PT1 => dest primitive type
@@ -426,10 +399,10 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
auto result_column_ptr =
_src_block_ptr->get_by_position(result_column_id).column;
// result_column_ptr maybe a ColumnConst, convert it to a
normal column
result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
- auto origin_column_type =
_src_block_ptr->get_by_name(slot_desc->col_name()).type;
+ auto origin_column_type =
_src_block_ptr->get_by_name(kv.first).type;
bool is_nullable = origin_column_type->is_nullable();
_src_block_ptr->replace_by_position(
-
_src_block_ptr->get_position_by_name(slot_desc->col_name()),
+ _src_block_ptr->get_position_by_name(kv.first),
is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
_src_block_ptr->erase(result_column_id);
}
@@ -754,9 +727,9 @@ Status VFileScanner::_get_next_reader() {
}
Status VFileScanner::_generate_fill_columns() {
- std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
- partition_columns;
- std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+ _partition_columns.reset(
+ new std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>());
+ _missing_columns.reset(new std::unordered_map<std::string,
VExprContextSPtr>());
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
@@ -768,8 +741,13 @@ Status VFileScanner::_generate_fill_columns() {
slot_desc->id());
}
const std::string& column_from_path =
range.columns_from_path[it->second];
- partition_columns.emplace(slot_desc->col_name(),
- std::make_tuple(column_from_path,
slot_desc));
+ const char* data = column_from_path.c_str();
+ size_t size = column_from_path.size();
+ if (size == 4 && memcmp(data, "null", 4) == 0) {
+ data = TextConverter::NULL_STR;
+ }
+ _partition_columns->emplace(slot_desc->col_name(),
+ std::make_tuple(data, slot_desc));
}
}
}
@@ -788,11 +766,16 @@ Status VFileScanner::_generate_fill_columns() {
return Status::InternalError("failed to find default value
expr for slot: {}",
slot_desc->col_name());
}
- missing_columns.emplace(slot_desc->col_name(), it->second);
+ _missing_columns->emplace(slot_desc->col_name(), it->second);
}
}
- return _cur_reader->set_fill_columns(partition_columns, missing_columns);
+ RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns,
*_missing_columns));
+ if (_cur_reader->fill_all_columns()) {
+ _partition_columns.reset(nullptr);
+ _missing_columns.reset(nullptr);
+ }
+ return Status::OK();
}
Status VFileScanner::_init_expr_ctxes() {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 9a16d056ed..3518a4e733 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -156,6 +156,10 @@ protected:
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
+ std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>>
+ _partition_columns;
+ std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>>
_missing_columns;
+
private:
RuntimeProfile::Counter* _get_block_timer = nullptr;
RuntimeProfile::Counter* _open_reader_timer = nullptr;
diff --git
a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out
b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out
new file mode 100644
index 0000000000..5608999eb5
--- /dev/null
+++
b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive_partition.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+0.1 test1 2023-01-01T00:00 \N
+0.2 test2 2023-01-02T00:00 \N
+0.3 test3 2023-01-03T00:00 100
+
+-- !q02 --
+0.1 test1 2023-01-01T00:00 \N
+0.2 test2 2023-01-02T00:00 \N
+
+-- !q03 --
+0.3 test3 2023-01-03T00:00 100
+
+-- !q04 --
+2023-01-01T00:00 \N 0.1 test1
+2023-01-02T00:00 \N 0.2 test2
+2023-01-03T00:00 100 0.3 test3
+
+-- !q05 --
+2023-01-01T00:00 \N 0.1 test1
+2023-01-02T00:00 \N 0.2 test2
+
+-- !q06 --
+2023-01-03T00:00 100 0.3 test3
+
+-- !q01 --
+0.1 test1 2023-01-01T00:00 \N
+0.2 test2 2023-01-02T00:00 \N
+0.3 test3 2023-01-03T00:00 100
+
+-- !q02 --
+0.1 test1 2023-01-01T00:00 \N
+0.2 test2 2023-01-02T00:00 \N
+
+-- !q03 --
+0.3 test3 2023-01-03T00:00 100
+
+-- !q04 --
+2023-01-01T00:00 \N 0.1 test1
+2023-01-02T00:00 \N 0.2 test2
+2023-01-03T00:00 100 0.3 test3
+
+-- !q05 --
+2023-01-01T00:00 \N 0.1 test1
+2023-01-02T00:00 \N 0.2 test2
+
+-- !q06 --
+2023-01-03T00:00 100 0.3 test3
+
+-- !q01 --
+0.1 test1 2023-01-01T00:00 \N
+0.2 test2 2023-01-02T00:00 \N
+0.3 test3 2023-01-03T00:00 100
+
+-- !q02 --
+0.1 test1 2023-01-01T00:00 \N
+0.2 test2 2023-01-02T00:00 \N
+
+-- !q03 --
+0.3 test3 2023-01-03T00:00 100
+
+-- !q04 --
+2023-01-01T00:00 \N 0.1 test1
+2023-01-02T00:00 \N 0.2 test2
+2023-01-03T00:00 100 0.3 test3
+
+-- !q05 --
+2023-01-01T00:00 \N 0.1 test1
+2023-01-02T00:00 \N 0.2 test2
+
+-- !q06 --
+2023-01-03T00:00 100 0.3 test3
+
diff --git
a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out
new file mode 100644
index 0000000000..c2582691cc
--- /dev/null
+++
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+100 0.3 test3 2023-01-03T00:00
+
+-- !q02 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+
+-- !q03 --
+100 0.3 test3 2023-01-03T00:00
+
+-- !q04 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+100 0.3 test3 2023-01-03T00:00
+
+-- !q05 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+
+-- !q06 --
+100 0.3 test3 2023-01-03T00:00
+
+-- !q07 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+100 0.3 test3 2023-01-03T00:00
+
+-- !q08 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+
+-- !q09 --
+100 0.3 test3 2023-01-03T00:00
+
+-- !q01 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+100 0.3 test3 2023-01-03T00:00
+
+-- !q02 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+
+-- !q03 --
+100 0.3 test3 2023-01-03T00:00
+
+-- !q04 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+100 0.3 test3 2023-01-03T00:00
+
+-- !q05 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+
+-- !q06 --
+100 0.3 test3 2023-01-03T00:00
+
+-- !q07 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+100 0.3 test3 2023-01-03T00:00
+
+-- !q08 --
+\N 0.1 test1 2023-01-01T00:00
+\N 0.2 test2 2023-01-02T00:00
+
+-- !q09 --
+100 0.3 test3 2023-01-03T00:00
+
diff --git
a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy
b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy
new file mode 100644
index 0000000000..fc6e7fbc23
--- /dev/null
+++
b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive_partition.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_catalog_hive_partition", "p2") {
+ String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String extHiveHmsHost =
context.config.otherConfigs.get("extHiveHmsHost")
+ String extHiveHmsPort =
context.config.otherConfigs.get("extHiveHmsPort")
+ String catalog_name = "test_external_catalog_hive_partition"
+
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ 'type'='hms',
+ 'hive.metastore.uris' =
'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
+ );
+ """
+
+ sql """switch ${catalog_name};"""
+ // test parquet format
+ def q01_parquet = {
+ qt_q01 """ select * from
multi_catalog.parquet_partitioned_one_column order by t_float """
+ qt_q02 """ select * from
multi_catalog.parquet_partitioned_one_column where t_int is null order by
t_float """
+ qt_q03 """ select * from
multi_catalog.parquet_partitioned_one_column where t_int is not null order by
t_float """
+ qt_q04 """ select * from multi_catalog.parquet_partitioned_columns
order by t_float """
+ qt_q05 """ select * from multi_catalog.parquet_partitioned_columns
where t_int is null order by t_float """
+ qt_q06 """ select * from multi_catalog.parquet_partitioned_columns
where t_int is not null order by t_float """
+ }
+ // test orc format
+ def q01_orc = {
+ qt_q01 """ select * from multi_catalog.orc_partitioned_one_column
order by t_float """
+ qt_q02 """ select * from multi_catalog.orc_partitioned_one_column
where t_int is null order by t_float """
+ qt_q03 """ select * from multi_catalog.orc_partitioned_one_column
where t_int is not null order by t_float """
+ qt_q04 """ select * from multi_catalog.orc_partitioned_columns
order by t_float """
+ qt_q05 """ select * from multi_catalog.orc_partitioned_columns
where t_int is null order by t_float """
+ qt_q06 """ select * from multi_catalog.orc_partitioned_columns
where t_int is not null order by t_float """
+ }
+ // test text format
+ def q01_text = {
+ qt_q01 """ select * from multi_catalog.text_partitioned_one_column
order by t_float """
+ qt_q02 """ select * from multi_catalog.text_partitioned_one_column
where t_int is null order by t_float """
+ qt_q03 """ select * from multi_catalog.text_partitioned_one_column
where t_int is not null order by t_float """
+ qt_q04 """ select * from multi_catalog.text_partitioned_columns
order by t_float """
+ qt_q05 """ select * from multi_catalog.text_partitioned_columns
where t_int is null order by t_float """
+ qt_q06 """ select * from multi_catalog.text_partitioned_columns
where t_int is not null order by t_float """
+ }
+ sql """ use `multi_catalog`; """
+ q01_parquet()
+ q01_orc()
+ q01_text()
+ }
+}
+
diff --git
a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy
new file mode 100644
index 0000000000..9179f22a06
--- /dev/null
+++
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_iceberg_partition.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_catalog_iceberg_partition", "p2") {
+ String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String extHiveHmsHost =
context.config.otherConfigs.get("extHiveHmsHost")
+ String extHiveHmsPort =
context.config.otherConfigs.get("extHiveHmsPort")
+ String catalog_name = "test_external_catalog_iceberg_partition"
+
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ 'type'='hms',
+ 'hive.metastore.uris' =
'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
+ );
+ """
+
+ sql """switch ${catalog_name};"""
+ // test parquet format
+ def q01_parquet = {
+ qt_q01 """ select * from
iceberg_catalog.parquet_partitioned_one_column order by t_float """
+ qt_q02 """ select * from
iceberg_catalog.parquet_partitioned_one_column where t_int is null order by
t_float """
+ qt_q03 """ select * from
iceberg_catalog.parquet_partitioned_one_column where t_int is not null order by
t_float """
+ qt_q04 """ select * from
iceberg_catalog.parquet_partitioned_columns order by t_float """
+ qt_q05 """ select * from
iceberg_catalog.parquet_partitioned_columns where t_int is null order by
t_float """
+ qt_q06 """ select * from
iceberg_catalog.parquet_partitioned_columns where t_int is not null order by
t_float """
+ qt_q07 """ select * from
iceberg_catalog.parquet_partitioned_truncate_and_fields order by t_float """
+ qt_q08 """ select * from
iceberg_catalog.parquet_partitioned_truncate_and_fields where t_int is null
order by t_float """
+ qt_q09 """ select * from
iceberg_catalog.parquet_partitioned_truncate_and_fields where t_int is not null
order by t_float """
+ }
+ // test orc format
+ def q01_orc = {
+ qt_q01 """ select * from
iceberg_catalog.orc_partitioned_one_column order by t_float """
+ qt_q02 """ select * from
iceberg_catalog.orc_partitioned_one_column where t_int is null order by t_float
"""
+ qt_q03 """ select * from
iceberg_catalog.orc_partitioned_one_column where t_int is not null order by
t_float """
+ qt_q04 """ select * from iceberg_catalog.orc_partitioned_columns
order by t_float """
+ qt_q05 """ select * from iceberg_catalog.orc_partitioned_columns
where t_int is null order by t_float """
+ qt_q06 """ select * from iceberg_catalog.orc_partitioned_columns
where t_int is not null order by t_float """
+ qt_q07 """ select * from
iceberg_catalog.orc_partitioned_truncate_and_fields order by t_float """
+ qt_q08 """ select * from
iceberg_catalog.orc_partitioned_truncate_and_fields where t_int is null order
by t_float """
+ qt_q09 """ select * from
iceberg_catalog.orc_partitioned_truncate_and_fields where t_int is not null
order by t_float """
+ }
+ sql """ use `iceberg_catalog`; """
+ q01_parquet()
+ q01_orc()
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]