This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 236f394c3d [dev-1.1.2][fix](parquet-reader) fix concurrency bug
(#11605)
236f394c3d is described below
commit 236f394c3d83674eabed38f58de44055e0d6f2ea
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Aug 10 14:46:33 2022 +0800
[dev-1.1.2][fix](parquet-reader) fix concurrency bug (#11605)
* [fix](parquet-reader) fix concurrency bug
Co-authored-by: morningman <[email protected]>
---
be/src/exec/parquet_reader.cpp | 79 ++++++++++++++++++++++++++---------------
be/src/exec/parquet_reader.h | 15 ++++++--
be/src/exec/parquet_scanner.cpp | 3 +-
3 files changed, 63 insertions(+), 34 deletions(-)
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index 61eb4d8e19..e65c1c5127 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -103,7 +103,7 @@ Status ParquetReaderWrap::init_parquet_reader(const
std::vector<SlotDescriptor*>
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
- std::thread thread(&ParquetReaderWrap::prefetch_batch, this);
+ std::thread thread(&ParquetReaderWrap::prefetch_batch, this,
&thread_status);
thread.detach();
// read batch
@@ -131,6 +131,10 @@ Status ParquetReaderWrap::init_parquet_reader(const
std::vector<SlotDescriptor*>
void ParquetReaderWrap::close() {
_closed = true;
_queue_writer_cond.notify_one();
+ // must wait the pre_fetch thread finish.
+ // because it may still use ParquetReader to read data, which may cause
+ // heap-after-use bug.
+ thread_status.get_future().get();
arrow::Status st = _parquet->Close();
if (!st.ok()) {
LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -537,7 +541,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const
std::vector<SlotDescriptor*>&
return read_record_batch(tuple_slot_descs, eof);
}
-void ParquetReaderWrap::prefetch_batch() {
+void ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) {
auto insert_batch = [this](const auto& batch) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.size() == _max_queue_size) {
@@ -552,22 +556,24 @@ void ParquetReaderWrap::prefetch_batch() {
int current_group = 0;
while (true) {
if (_closed || current_group >= _total_groups) {
- return;
+ break;
}
_status = _reader->GetRecordBatchReader({current_group},
_parquet_column_ids, &_rb_batch);
if (!_status.ok()) {
_closed = true;
- return;
+ break;
}
arrow::RecordBatchVector batches;
_status = _rb_batch->ReadAll(&batches);
if (!_status.ok()) {
_closed = true;
- return;
+ break;
}
std::for_each(batches.begin(), batches.end(), insert_batch);
current_group++;
}
+ // the status' value is meaningless, just for notifying that thread is
done.
+ status->set_value(Status::OK());
}
Status ParquetReaderWrap::read_next_batch() {
@@ -596,30 +602,36 @@ ParquetFile::~ParquetFile() {
}
arrow::Status ParquetFile::Close() {
+ std::lock_guard<std::mutex> lock(_lock);
+ if (_is_closed) {
+ return arrow::Status::OK();
+ }
+
if (_file != nullptr) {
_file->close();
delete _file;
_file = nullptr;
}
+ _is_closed = true;
return arrow::Status::OK();
}
bool ParquetFile::closed() const {
- if (_file != nullptr) {
- return _file->closed();
- } else {
- return true;
- }
+ return _is_closed;
}
-arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* buffer) {
- return ReadAt(_pos, nbytes, buffer);
+arrow::Status ParquetFile::Seek(int64_t position) {
+ _pos = position;
+ // NOTE: Only readat operation is used, so _file seek is not called here.
+ return arrow::Status::OK();
}
-arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes,
void* out) {
+arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* out) {
+ if (_is_closed) {
+ return arrow::Status::IOError("Already closed");
+ }
int64_t reads = 0;
int64_t bytes_read = 0;
- _pos = position;
while (nbytes > 0) {
Status result = _file->readat(_pos, nbytes, &reads, out);
if (!result.ok()) {
@@ -637,25 +649,14 @@ arrow::Result<int64_t> ParquetFile::ReadAt(int64_t
position, int64_t nbytes, voi
return bytes_read;
}
-arrow::Result<int64_t> ParquetFile::GetSize() {
- return _file->size();
-}
-
-arrow::Status ParquetFile::Seek(int64_t position) {
- _pos = position;
- // NOTE: Only readat operation is used, so _file seek is not called here.
- return arrow::Status::OK();
-}
-
-arrow::Result<int64_t> ParquetFile::Tell() const {
- return _pos;
-}
-
arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::Read(int64_t
nbytes) {
+ if (_is_closed) {
+ return arrow::Status::IOError("Already closed");
+ }
auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool());
ARROW_RETURN_NOT_OK(buffer);
std::shared_ptr<arrow::Buffer> read_buf = std::move(buffer.ValueOrDie());
- auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data());
+ auto bytes_read = Read(nbytes, read_buf->mutable_data());
ARROW_RETURN_NOT_OK(bytes_read);
// If bytes_read is equal with read_buf's capacity, we just assign
if (bytes_read.ValueOrDie() == nbytes) {
@@ -665,4 +666,24 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
ParquetFile::Read(int64_t nbytes)
}
}
+arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes,
void* out) {
+ std::lock_guard<std::mutex> lock(_lock);
+ ARROW_RETURN_NOT_OK(Seek(position));
+ return Read(nbytes, out);
+}
+
+arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::ReadAt(int64_t
position, int64_t nbytes) {
+ std::lock_guard<std::mutex> lock(_lock);
+ ARROW_RETURN_NOT_OK(Seek(position));
+ return Read(nbytes);
+}
+
+arrow::Result<int64_t> ParquetFile::GetSize() {
+ return _file->size();
+}
+
+arrow::Result<int64_t> ParquetFile::Tell() const {
+ return _pos;
+}
+
} // namespace doris
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h
index 93f1a2b2dd..f56a56061c 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/parquet_reader.h
@@ -37,6 +37,7 @@
#include <mutex>
#include <string>
#include <thread>
+#include <future>
#include "common/status.h"
#include "common/config.h"
@@ -59,18 +60,24 @@ class ParquetFile : public arrow::io::RandomAccessFile {
public:
ParquetFile(FileReader* file);
~ParquetFile() override;
+
+ arrow::Status Seek(int64_t position) override;
arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes)
override;
+
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out)
override;
+ arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position,
int64_t nbytes) override;
+
arrow::Result<int64_t> GetSize() override;
- arrow::Status Seek(int64_t position) override;
- arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes)
override;
arrow::Result<int64_t> Tell() const override;
arrow::Status Close() override;
bool closed() const override;
private:
+ std::mutex _lock;
FileReader* _file;
int64_t _pos = 0;
+ bool _is_closed = false;
};
// Reader of broker parquet file
@@ -97,7 +104,7 @@ private:
int32_t* wbtyes);
private:
- void prefetch_batch();
+ void prefetch_batch(std::promise<Status>* status);
Status read_next_batch();
private:
@@ -129,6 +136,8 @@ private:
std::condition_variable _queue_writer_cond;
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
+
+ std::promise<Status> thread_status;
};
} // namespace doris
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 3295dc4bc7..b8e077c7e7 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -135,8 +135,7 @@ Status ParquetScanner::open_next_reader() {
break;
}
case TFileType::FILE_S3: {
- file_reader.reset(new BufferedReader(
- _profile, new S3Reader(_params.properties, range.path,
range.start_offset)));
+ file_reader.reset(new S3Reader(_params.properties, range.path,
range.start_offset));
break;
}
default: {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]