This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new ca2944701e [dev-1.1.2](parquet-reader) fix dead log of parquet reader
prefetch thread (#12292)
ca2944701e is described below
commit ca2944701edba5df39909aac46e6d559a650c919
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Sep 2 13:01:52 2022 +0800
[dev-1.1.2](parquet-reader) fix dead log of parquet reader prefetch thread
(#12292)
fix dead log of parquet reader prefetch thread
---
be/src/exec/parquet_reader.cpp | 11 +++++------
be/src/exec/parquet_reader.h | 4 ++--
2 files changed, 7 insertions(+), 8 deletions(-)
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index e65c1c5127..5e040611b8 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -103,8 +103,7 @@ Status ParquetReaderWrap::init_parquet_reader(const
std::vector<SlotDescriptor*>
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
- std::thread thread(&ParquetReaderWrap::prefetch_batch, this,
&thread_status);
- thread.detach();
+ _prefetch_thread = std::thread(&ParquetReaderWrap::prefetch_batch,
this);
// read batch
RETURN_IF_ERROR(read_next_batch());
@@ -134,7 +133,9 @@ void ParquetReaderWrap::close() {
// must wait the pre_fetch thread finish.
// because it may still use ParquetReader to read data, which may cause
// heap-after-use bug.
- thread_status.get_future().get();
+ if (_prefetch_thread.joinable()) {
+ _prefetch_thread.join();
+ }
arrow::Status st = _parquet->Close();
if (!st.ok()) {
LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -541,7 +542,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const
std::vector<SlotDescriptor*>&
return read_record_batch(tuple_slot_descs, eof);
}
-void ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) {
+void ParquetReaderWrap::prefetch_batch() {
auto insert_batch = [this](const auto& batch) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.size() == _max_queue_size) {
@@ -572,8 +573,6 @@ void
ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) {
std::for_each(batches.begin(), batches.end(), insert_batch);
current_group++;
}
- // the status' value is meaningless, just for notifying that thread is
done.
- status->set_value(Status::OK());
}
Status ParquetReaderWrap::read_next_batch() {
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h
index f56a56061c..96dcd55af9 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/parquet_reader.h
@@ -104,7 +104,7 @@ private:
int32_t* wbtyes);
private:
- void prefetch_batch(std::promise<Status>* status);
+ void prefetch_batch();
Status read_next_batch();
private:
@@ -137,7 +137,7 @@ private:
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
- std::promise<Status> thread_status;
+ std::thread _prefetch_thread;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]