This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4bc65aa921 [fix](load) PrefetchBufferedReader Crashing caused
updating counter with an invalid runtime profile (#22464)
4bc65aa921 is described below
commit 4bc65aa921399fba784ded9e54efe62e6a8ed69b
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Aug 2 18:19:48 2023 +0800
[fix](load) PrefetchBufferedReader Crashing caused updating counter with
an invalid runtime profile (#22464)
---
be/src/io/fs/buffered_reader.cpp | 17 ++++++++++++++---
be/src/io/fs/buffered_reader.h | 2 ++
be/src/io/fs/stream_load_pipe.h | 4 +++-
be/src/vec/exec/format/csv/csv_reader.cpp | 10 ++++++++++
be/src/vec/exec/format/csv/csv_reader.h | 2 ++
be/src/vec/exec/format/generic_reader.h | 2 ++
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 6 +++++-
be/src/vec/exec/format/parquet/vparquet_reader.h | 3 ++-
be/src/vec/exec/scan/vfile_scanner.cpp | 7 +++++++
9 files changed, 47 insertions(+), 6 deletions(-)
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index be64439128..726f5331c9 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -624,8 +624,11 @@
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
}
PrefetchBufferedReader::~PrefetchBufferedReader() {
- close();
- _closed = true;
+ /// set `_sync_profile` to nullptr to avoid updating counter after the
runtime profile has been released.
+ std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
+ [](std::shared_ptr<PrefetchBuffer>& buffer) {
buffer->_sync_profile = nullptr; });
+ /// Better not to call virtual functions in a destructor.
+ _close_internal();
}
Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result,
size_t* bytes_read,
@@ -654,6 +657,10 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset,
Slice result, size_t*
}
Status PrefetchBufferedReader::close() {
+ return _close_internal();
+}
+
+Status PrefetchBufferedReader::_close_internal() {
if (!_closed) {
_closed = true;
std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
@@ -669,10 +676,14 @@ InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr
reader) : _reader(std:
}
InMemoryFileReader::~InMemoryFileReader() {
- close();
+ _close_internal();
}
Status InMemoryFileReader::close() {
+ return _close_internal();
+}
+
+Status InMemoryFileReader::_close_internal() {
if (!_closed) {
_closed = true;
return _reader->close();
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index b0728e6af1..34e1ff34fe 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -383,6 +383,7 @@ protected:
const IOContext* io_ctx) override;
private:
+ Status _close_internal();
size_t get_buffer_pos(int64_t position) const {
return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size;
}
@@ -436,6 +437,7 @@ protected:
const IOContext* io_ctx) override;
private:
+ Status _close_internal();
io::FileReaderSPtr _reader;
std::unique_ptr<char[]> _data = nullptr;
size_t _size;
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index a184cf9e78..508620d5af 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -60,7 +60,9 @@ public:
// called when consumer finished
Status close() override {
- cancel("closed");
+ if (!(_finished || _cancelled)) {
+ cancel("closed");
+ }
return Status::OK();
}
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 55a43ed4c5..43c1dde1ce 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -753,4 +753,14 @@ Status CsvReader::_parse_col_types(size_t col_nums,
std::vector<TypeDescriptor>*
return Status::OK();
}
+void CsvReader::close() {
+ if (_line_reader) {
+ _line_reader->close();
+ }
+
+ if (_file_reader) {
+ _file_reader->close();
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/csv/csv_reader.h
b/be/src/vec/exec/format/csv/csv_reader.h
index da3505e09b..42178846f1 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -80,6 +80,8 @@ public:
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
+ void close() override;
+
private:
// used for stream/broker load of csv file.
Status _create_decompressor();
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index a712fb2847..7b6f3c7b9c 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -67,6 +67,8 @@ public:
return Status::OK();
}
+ virtual void close() {}
+
protected:
const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 9599ed70b4..ba80992a04 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -100,7 +100,7 @@ ParquetReader::ParquetReader(const TFileScanRangeParams&
params, const TFileRang
}
ParquetReader::~ParquetReader() {
- close();
+ _close_internal();
}
void ParquetReader::_init_profile() {
@@ -162,6 +162,10 @@ void ParquetReader::_init_profile() {
}
void ParquetReader::close() {
+ _close_internal();
+}
+
+void ParquetReader::_close_internal() {
if (!_closed) {
if (_profile != nullptr) {
COUNTER_UPDATE(_parquet_profile.filtered_row_groups,
_statistics.filtered_row_groups);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 63f760abcd..0f3996db40 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -119,7 +119,7 @@ public:
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
- void close();
+ void close() override;
RowRange get_whole_range() { return _whole_range; }
@@ -182,6 +182,7 @@ private:
Status _open_file();
void _init_profile();
+ void _close_internal();
Status _next_row_group_reader();
RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
const tparquet::RowGroup& row_group,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index ac12172221..e30d7640ed 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -558,6 +558,9 @@ Status VFileScanner::_convert_to_output_block(Block* block)
{
Status VFileScanner::_get_next_reader() {
while (true) {
+ if (_cur_reader) {
+ _cur_reader->close();
+ }
_cur_reader.reset(nullptr);
_src_block_init = false;
if (_next_range >= _ranges.size()) {
@@ -936,6 +939,10 @@ Status VFileScanner::close(RuntimeState* state) {
cache_profile.update(_file_cache_statistics.get());
}
+ if (_cur_reader) {
+ _cur_reader->close();
+ }
+
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]