This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch mc-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/mc-test-branch-4.1 by this
push:
new 5b2f10041ff Fix parquet row group column lookup crash (#63100)
5b2f10041ff is described below
commit 5b2f10041ff68e6318a1ccd85ebc9b46ad5af9bb
Author: Gabriel <[email protected]>
AuthorDate: Sat May 9 15:31:20 2026 +0800
Fix parquet row group column lookup crash (#63100)
### What problem does this PR solve?
*** tablet id: 0 ***
*** Aborted at 1778254939 (unix time) try "date -d @1778254939" if you
are using GNU date ***
*** Current BE git commitID: c6e6ecc45c5 ***
*** SIGSEGV address not mapped to object (@0x13) received by PID 40355
(TID 77840 OR 0x7ef6079ff640) from PID 19; stack trace: ***
0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
siginfo_t*, void*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/common/signal_handler.h:420
1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0]
in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
2# JVM_handle_linux_signal in
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
3# 0x00007F036C55C520 in /lib/x86_64-linux-gnu/libc.so.6
4# std::_Hash_bytes(void const*, unsigned long, unsigned long) in
/mnt/hdd01/PERFORMANCE_ENV/be/lib/doris_be
5# std::__detail::_Map_base<std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> >,
std::pair<std::__cxx11::basic_string<char, std::char_traits<char>,
std::allocator<char> > const, unsigned int>,
std::allocator<std::pair<std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> > const, unsigned int> >,
std::__detail::_Select1st,
std::equal_to<std::__cxx11::basic_string<char, std::char_traits<char>,
std::allocator<char> > >, std::hash<std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> > >,
std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash,
std::__detail::_Prime_rehash_policy,
std::__detail::_Hashtable_traits<true, false, true>,
true>::operator[](std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&) in
/mnt/hdd01/PERFORMANCE_ENV/be/lib/doris_be
6# doris::RowGroupReader::_read_column_data(doris::Block*,
std::vector<std::__cxx11::basic_string<char, std::char_traits<char>,
std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> > > > const&, unsigned
long, unsigned long*, bool*, doris::FilterMap&) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/parquet/vparquet_group_reader.cpp:496
7# doris::RowGroupReader::next_batch(doris::Block*, unsigned long,
unsigned long*, bool*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/parquet/vparquet_group_reader.cpp:404
8# doris::ParquetReader::get_next_block(doris::Block*, unsigned long*,
bool*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/parquet/vparquet_reader.cpp:724
9# doris::IcebergTableReader::get_next_block_inner(doris::Block*,
unsigned long*, bool*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/table/iceberg_reader.cpp:185
10# doris::TableFormatReader::get_next_block(doris::Block*, unsigned
long*, bool*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/format/table/table_format_reader.h:80
11# doris::FileScanner::_get_block_wrapped(doris::RuntimeState*,
doris::Block*, bool*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/file_scanner.cpp:474
12# doris::FileScanner::_get_block_impl(doris::RuntimeState*,
doris::Block*, bool*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/file_scanner.cpp:407
13# doris::Scanner::get_block(doris::RuntimeState*, doris::Block*,
bool*) in /mnt/hdd01/PERFORMANCE_ENV/be/lib/doris_be
14# doris::Scanner::get_block_after_projects(doris::RuntimeState*,
doris::Block*, bool*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/scanner.cpp:91
15#
doris::ScannerScheduler::_scanner_scan(std::shared_ptr<doris::ScannerContext>,
std::shared_ptr<doris::ScanTask>) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/scanner_scheduler.cpp:186
16# std::_Function_handler<bool (),
doris::ScannerScheduler::submit(std::shared_ptr<doris::ScannerContext>,
std::shared_ptr<doris::ScanTask>)::$_0::operator()()
const::{lambda()#1}>::_M_invoke(std::_Any_data const&) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292
17# doris::ScannerSplitRunner::process_for(std::chrono::duration<long,
std::ratio<1l, 1000000000l> >) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/scanner_scheduler.cpp:427
18# doris::PrioritizedSplitRunner::process() at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/task_executor/time_sharing/prioritized_split_runner.cpp:103
19# doris::TimeSharingTaskExecutor::_dispatch_thread() at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp:558
20# doris::Thread::supervise_thread(void*) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/util/thread.cpp:461
21# start_thread at ./nptl/pthread_create.c:442
22# 0x00007F036C6408D0 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/format/parquet/vparquet_group_reader.cpp | 124 +++++++++++++-----------
be/src/format/parquet/vparquet_group_reader.h | 6 +-
2 files changed, 74 insertions(+), 56 deletions(-)
diff --git a/be/src/format/parquet/vparquet_group_reader.cpp
b/be/src/format/parquet/vparquet_group_reader.cpp
index 69eba372149..6531ce9dd8f 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -492,31 +492,31 @@ Status RowGroupReader::_read_column_data(Block* block,
size_t batch_read_rows = 0;
bool has_eof = false;
for (auto& read_col_name : table_columns) {
- auto& column_with_type_and_name =
-
block->safe_get_by_position((*_col_name_to_block_idx)[read_col_name]);
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, read_col_name,
&block_pos));
+ auto reader_iter = _column_readers.find(read_col_name);
+ if (reader_iter == _column_readers.end() || reader_iter->second ==
nullptr) {
+ return Status::InternalError("Column reader for '{}' not found in
parquet row group",
+ read_col_name);
+ }
+
+ auto& column_with_type_and_name =
block->safe_get_by_position(block_pos);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
bool is_dict_filter = false;
for (auto& _dict_filter_col : _dict_filter_cols) {
if (_dict_filter_col.first == read_col_name) {
MutableColumnPtr dict_column = ColumnInt32::create();
- if (!_col_name_to_block_idx->contains(read_col_name)) {
- return Status::InternalError(
- "Wrong read column '{}' in parquet file, block:
{}", read_col_name,
- block->dump_structure());
- }
if (column_type->is_nullable()) {
-
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
+ block->get_by_position(block_pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
- (*_col_name_to_block_idx)[read_col_name],
- ColumnNullable::create(std::move(dict_column),
-
ColumnUInt8::create(dict_column->size(), 0)));
+ block_pos, ColumnNullable::create(
+ std::move(dict_column),
+
ColumnUInt8::create(dict_column->size(), 0)));
} else {
-
block->get_by_position((*_col_name_to_block_idx)[read_col_name]).type =
- std::make_shared<DataTypeInt32>();
-
block->replace_by_position((*_col_name_to_block_idx)[read_col_name],
- std::move(dict_column));
+ block->get_by_position(block_pos).type =
std::make_shared<DataTypeInt32>();
+ block->replace_by_position(block_pos,
std::move(dict_column));
}
is_dict_filter = true;
break;
@@ -527,10 +527,10 @@ Status RowGroupReader::_read_column_data(Block* block,
bool col_eof = false;
// Should reset _filter_map_index to 0 when reading next column.
// select_vector.reset();
- _column_readers[read_col_name]->reset_filter_map_index();
+ reader_iter->second->reset_filter_map_index();
while (!col_eof && col_read_rows < batch_size) {
size_t loop_rows = 0;
- RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
+ RETURN_IF_ERROR(reader_iter->second->read_column_data(
column_ptr, column_type,
_table_info_node_ptr->get_children_node(read_col_name),
filter_map, batch_size - col_read_rows, &loop_rows,
&col_eof, is_dict_filter));
VLOG_DEBUG << "[RowGroupReader] column '" << read_col_name
@@ -656,19 +656,19 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
SCOPED_RAW_TIMER(&_predicate_filter_time);
for (const auto& col : _lazy_read_ctx.predicate_columns.first)
{
// clean block to read predicate columns
- block->get_by_position((*_col_name_to_block_idx)[col])
- .column->assume_mutable()
- ->clear();
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, col,
&block_pos));
+
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
for (const auto& col :
_lazy_read_ctx.predicate_partition_columns) {
-
block->get_by_position((*_col_name_to_block_idx)[col.first])
- .column->assume_mutable()
- ->clear();
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, col.first,
&block_pos));
+
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
for (const auto& col :
_lazy_read_ctx.predicate_missing_columns) {
-
block->get_by_position((*_col_name_to_block_idx)[col.first])
- .column->assume_mutable()
- ->clear();
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, col.first,
&block_pos));
+
block->get_by_position(block_pos).column->assume_mutable()->clear();
}
if (_row_id_column_iterator_pair.first != nullptr) {
block->get_by_position(_row_id_column_iterator_pair.second)
@@ -826,7 +826,9 @@ Status RowGroupReader::_fill_partition_columns(
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
for (const auto& kv : partition_columns) {
- auto doris_column =
block->get_by_position((*_col_name_to_block_idx)[kv.first]).column;
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
+ auto doris_column = block->get_by_position(block_pos).column;
// obtained from block*, it is a mutable object.
auto* col_ptr = const_cast<IColumn*>(doris_column.get());
const auto& [value, slot_desc] = kv.second;
@@ -855,14 +857,11 @@ Status RowGroupReader::_fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
for (const auto& kv : missing_columns) {
- if (!_col_name_to_block_idx->contains(kv.first)) {
- return Status::InternalError("Missing column: {} not found in
block {}", kv.first,
- block->dump_structure());
- }
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, kv.first, &block_pos));
if (kv.second == nullptr) {
// no default column, fill with null
- auto mutable_column =
block->get_by_position((*_col_name_to_block_idx)[kv.first])
- .column->assume_mutable();
+ auto mutable_column =
block->get_by_position(block_pos).column->assume_mutable();
auto* nullable_column =
assert_cast<ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
@@ -879,18 +878,38 @@ Status RowGroupReader::_fill_missing_columns(
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a
normal column
result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
- auto origin_column_type =
-
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
+ auto origin_column_type =
block->get_by_position(block_pos).type;
bool is_nullable = origin_column_type->is_nullable();
block->replace_by_position(
- (*_col_name_to_block_idx)[kv.first],
- is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
+ block_pos, is_nullable ?
make_nullable(result_column_ptr) : result_column_ptr);
}
}
}
return Status::OK();
}
+Status RowGroupReader::_get_block_column_pos(const Block& block, const
std::string& column_name,
+ uint32_t* position) const {
+ if (_col_name_to_block_idx == nullptr) {
+ return Status::InternalError(
+ "Column name to block index map is not set when reading
parquet column '{}', block: "
+ "{}",
+ column_name, block.dump_structure());
+ }
+ auto iter = _col_name_to_block_idx->find(column_name);
+ if (iter == _col_name_to_block_idx->end()) {
+ return Status::InternalError("Column '{}' not found in block index
map, block: {}",
+ column_name, block.dump_structure());
+ }
+ if (iter->second >= block.columns()) {
+ return Status::InternalError(
+ "Column '{}' maps to invalid block position {}, block columns:
{}, block: {}",
+ column_name, iter->second, block.columns(),
block.dump_structure());
+ }
+ *position = iter->second;
+ return Status::OK();
+}
+
Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows,
bool* batch_eof,
bool* modify_row_ids) {
*modify_row_ids = false;
@@ -1334,38 +1353,35 @@ Status
RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,
Status RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
for (auto& dict_filter_cols : _dict_filter_cols) {
- if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
- throw Exception(ErrorCode::INTERNAL_ERROR,
- "Wrong read column '{}' in parquet file, block:
{}",
- dict_filter_cols.first, block->dump_structure());
+ uint32_t block_pos = 0;
+ RETURN_IF_ERROR(_get_block_column_pos(*block, dict_filter_cols.first,
&block_pos));
+ auto reader_iter = _column_readers.find(dict_filter_cols.first);
+ if (reader_iter == _column_readers.end() || reader_iter->second ==
nullptr) {
+ return Status::InternalError("Column reader for '{}' not found in
parquet row group",
+ dict_filter_cols.first);
}
- ColumnWithTypeAndName& column_with_type_and_name =
-
block->get_by_position((*_col_name_to_block_idx)[dict_filter_cols.first]);
+ ColumnWithTypeAndName& column_with_type_and_name =
block->get_by_position(block_pos);
const ColumnPtr& column = column_with_type_and_name.column;
if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
const auto* dict_column = assert_cast<const
ColumnInt32*>(nested_column.get());
DCHECK(dict_column);
- auto string_column = DORIS_TRY(
-
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
- dict_column));
+ auto string_column =
+
DORIS_TRY(reader_iter->second->convert_dict_column_to_string_column(dict_column));
column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
- (*_col_name_to_block_idx)[dict_filter_cols.first],
- ColumnNullable::create(std::move(string_column),
-
nullable_column->get_null_map_column_ptr()));
+ block_pos, ColumnNullable::create(std::move(string_column),
+
nullable_column->get_null_map_column_ptr()));
} else {
const auto* dict_column = assert_cast<const
ColumnInt32*>(column.get());
- auto string_column = DORIS_TRY(
-
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
- dict_column));
+ auto string_column =
+
DORIS_TRY(reader_iter->second->convert_dict_column_to_string_column(dict_column));
column_with_type_and_name.type =
std::make_shared<DataTypeString>();
-
block->replace_by_position((*_col_name_to_block_idx)[dict_filter_cols.first],
- std::move(string_column));
+ block->replace_by_position(block_pos, std::move(string_column));
}
}
return Status::OK();
diff --git a/be/src/format/parquet/vparquet_group_reader.h
b/be/src/format/parquet/vparquet_group_reader.h
index 657876b36e6..6d514500a57 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -233,6 +233,8 @@ private:
Status _fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns);
+ Status _get_block_column_pos(const Block& block, const std::string&
column_name,
+ uint32_t* position) const;
Status _build_pos_delete_filter(size_t read_rows);
Status _filter_block(Block* block, int column_to_keep,
const std::vector<uint32_t>& columns_to_filter);
@@ -252,7 +254,7 @@ private:
io::FileReaderSPtr _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_column_readers; // table_column_name
- const std::vector<std::string>& _read_table_columns;
+ std::vector<std::string> _read_table_columns;
const int32_t _row_group_id;
const tparquet::RowGroup& _row_group_meta;
@@ -264,7 +266,7 @@ private:
// merge the row ranges generated from page index and position delete.
RowRanges _read_ranges;
- const LazyReadContext& _lazy_read_ctx;
+ LazyReadContext _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
int64_t _predicate_filter_time = 0;
int64_t _dict_filter_rewrite_time = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]