This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b4f471050a7 [fix](merge-clod) fix file not found when load for mow
table (#32144)
b4f471050a7 is described below
commit b4f471050a76983f972ec510d4dc0fd4d0143ff3
Author: Xin Liao <[email protected]>
AuthorDate: Wed Mar 13 14:23:56 2024 +0800
[fix](merge-clod) fix file not found when load for mow table (#32144)
---
be/src/olap/rowset/beta_rowset_writer.cpp | 5 +++++
be/src/olap/rowset/segment_creator.cpp | 13 ++++++++++---
be/src/olap/rowset/segment_creator.h | 9 ++++++++-
3 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 6fadbf0910d..609a06e40bb 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -804,6 +804,11 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t
segment_id, const SegmentStati
update_rowset_schema(flush_schema);
}
if (_context.mow_context != nullptr) {
+ // ensure that the segment file writing is complete
+ auto* file_writer = _segment_creator.get_file_writer(segment_id);
+ if (file_writer) {
+ RETURN_IF_ERROR(file_writer->close());
+ }
RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
}
return Status::OK();
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index 8f2553ade59..9a62055ac3f 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -151,7 +151,7 @@ Status
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
Status SegmentFlusher::close() {
std::lock_guard<SpinLock> l(_lock);
- for (auto& file_writer : _file_writers) {
+ for (auto& [segment_id, file_writer] : _file_writers) {
Status status = file_writer->close();
if (!status.ok()) {
LOG(WARNING) << "failed to close file writer, path=" <<
file_writer->path()
@@ -205,7 +205,7 @@ Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
_context->max_rows_per_segment, writer_options,
_context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
- _file_writers.push_back(std::move(file_writer));
+ _file_writers.emplace(segment_id, std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
@@ -236,7 +236,7 @@ Status SegmentFlusher::_create_segment_writer(
_context->max_rows_per_segment, writer_options,
_context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
- _file_writers.push_back(std::move(file_writer));
+ _file_writers.emplace(segment_id, std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
@@ -345,6 +345,13 @@ Status
SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& wr
return Status::OK();
}
+io::FileWriter* SegmentFlusher::get_file_writer(int32_t segment_id) {
+ if (!_file_writers.contains(segment_id)) {
+ return nullptr;
+ }
+ return _file_writers[segment_id].get();
+}
+
SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
std::unique_ptr<segment_v2::SegmentWriter>&
segment_writer)
: _flusher(flusher), _writer(std::move(segment_writer)) {};
diff --git a/be/src/olap/rowset/segment_creator.h
b/be/src/olap/rowset/segment_creator.h
index 668c75e47b3..fe439d3bc7a 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -20,6 +20,7 @@
#include <gen_cpp/olap_file.pb.h>
#include <string>
+#include <unordered_map>
#include <vector>
#include "common/status.h"
@@ -102,6 +103,8 @@ public:
int64_t num_rows_filtered() const { return _num_rows_filtered; }
+ io::FileWriter* get_file_writer(int32_t segment_id);
+
Status close();
public:
@@ -153,7 +156,7 @@ private:
RowsetWriterContext* _context;
mutable SpinLock _lock; // protect following vectors.
- std::vector<io::FileWriterPtr> _file_writers;
+ std::unordered_map<int32_t, io::FileWriterPtr> _file_writers;
// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
@@ -196,6 +199,10 @@ public:
Status close();
+ io::FileWriter* get_file_writer(int32_t segment_id) {
+ return _segment_flusher.get_file_writer(segment_id);
+ }
+
private:
std::atomic<int32_t> _next_segment_id = 0;
SegmentFlusher _segment_flusher;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]