This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4995ca8fba6 [fix](move-memtable) ensure segment is flushed before add
segment (#26522)
4995ca8fba6 is described below
commit 4995ca8fba6a768ab5eb718a68e324d94c82c497
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 7 22:42:16 2023 +0800
[fix](move-memtable) ensure segment is flushed before add segment (#26522)
---
be/src/olap/rowset/beta_rowset_writer.cpp | 2 +-
be/src/olap/rowset/beta_rowset_writer.h | 2 +-
be/src/olap/rowset/beta_rowset_writer_v2.cpp | 2 +-
be/src/olap/rowset/beta_rowset_writer_v2.h | 2 +-
be/src/olap/rowset/rowset_writer.h | 4 ++--
be/src/runtime/load_stream.cpp | 11 +++++++++--
be/src/runtime/load_stream_writer.cpp | 16 ++++++++++++----
be/src/runtime/load_stream_writer.h | 2 +-
be/src/vec/sink/load_stream_stub.cpp | 2 +-
be/src/vec/sink/load_stream_stub.h | 2 +-
10 files changed, 30 insertions(+), 15 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 5ef14743492..080086a0a93 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -691,7 +691,7 @@ Status BetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}
-Status BetaRowsetWriter::add_segment(uint32_t segment_id, SegmentStatistics&
segstat) {
+Status BetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat) {
uint32_t segid_offset = segment_id - _segment_start_id;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 9c615048a7f..8918e30f3e2 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -78,7 +78,7 @@ public:
Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer)
override;
- Status add_segment(uint32_t segment_id, SegmentStatistics& segstat)
override;
+ Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat)
override;
Status flush() override;
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index abd7af2aaba..e16ce5fa9fa 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -88,7 +88,7 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t
segment_id, io::FileWrite
return Status::OK();
}
-Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics&
segstat) {
+Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat) {
for (const auto& stream : _streams) {
RETURN_IF_ERROR(stream->add_segment(_context.partition_id,
_context.index_id,
_context.tablet_id, segment_id,
segstat));
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index d35f4b2486f..1a6db6df409 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -120,7 +120,7 @@ public:
return Status::OK();
}
- Status add_segment(uint32_t segment_id, SegmentStatistics& segstat)
override;
+ Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat)
override;
int32_t allocate_segment_id() override { return
_next_segment_id.fetch_add(1); };
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 7958cc2ce07..8ba5666bf56 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -48,7 +48,7 @@ struct SegmentStatistics {
index_size(pb.index_size()),
key_bounds(pb.key_bounds()) {}
- void to_pb(SegmentStatisticsPB* segstat_pb) {
+ void to_pb(SegmentStatisticsPB* segstat_pb) const {
segstat_pb->set_row_num(row_num);
segstat_pb->set_data_size(data_size);
segstat_pb->set_index_size(index_size);
@@ -114,7 +114,7 @@ public:
"RowsetWriter not support flush_single_block");
}
- virtual Status add_segment(uint32_t segment_id, SegmentStatistics&
segstat) {
+ virtual Status add_segment(uint32_t segment_id, const SegmentStatistics&
segstat) {
return Status::NotSupported("RowsetWriter does not support
add_segment");
}
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 9d1722c1726..b9cc5891b48 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -129,7 +129,7 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
LOG(INFO) << "write data failed " << *this;
}
};
- return _flush_tokens[segid %
_flush_tokens.size()]->submit_func(flush_func);
+ return _flush_tokens[new_segid %
_flush_tokens.size()]->submit_func(flush_func);
}
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf*
data) {
@@ -151,7 +151,14 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
- return _load_stream_writer->add_segment(new_segid, stat);
+ auto add_segment_func = [this, new_segid, stat]() {
+ auto st = _load_stream_writer->add_segment(new_segid, stat);
+ if (!st.ok() && _failed_st->ok()) {
+ _failed_st = std::make_shared<Status>(st);
+ LOG(INFO) << "add segment failed " << *this;
+ }
+ };
+ return _flush_tokens[new_segid %
_flush_tokens.size()]->submit_func(add_segment_func);
}
Status TabletStream::close() {
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index e44a682b25b..504e02fb3b1 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -121,7 +121,15 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
return Status::OK();
}
-Status LoadStreamWriter::add_segment(uint32_t segid, SegmentStatistics& stat) {
+Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics&
stat) {
+ if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) {
+ LOG(WARNING) << _segment_file_writers[segid]->path() << " is
incomplete, actual size: "
+ << _segment_file_writers[segid]->bytes_appended()
+ << ", expected size: " << stat.data_size;
+ return Status::Corruption("segment {} is incomplete, actual size: {},
expected size: {}",
+
_segment_file_writers[segid]->path().native(),
+
_segment_file_writers[segid]->bytes_appended(), stat.data_size);
+ }
return _rowset_writer->add_segment(segid, stat);
}
@@ -143,9 +151,9 @@ Status LoadStreamWriter::close() {
return Status::Error<ErrorCode::INTERNAL_ERROR>("flush segment
failed");
}
- for (size_t i = 0; i < _segment_file_writers.size(); i++) {
- if (!_segment_file_writers[i]->is_closed()) {
- return Status::Corruption("segment {} is not eos", i);
+ for (const auto& writer : _segment_file_writers) {
+ if (!writer->is_closed()) {
+ return Status::Corruption("segment {} is not closed",
writer->path().native());
}
}
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index e4a3341cea3..dc952d2c18d 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -71,7 +71,7 @@ public:
Status close_segment(uint32_t segid);
- Status add_segment(uint32_t segid, SegmentStatistics& stat);
+ Status add_segment(uint32_t segid, const SegmentStatistics& stat);
// wait for all memtables to be flushed.
Status close();
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index a953747f27b..fe9887b3a3e 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -174,7 +174,7 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
// ADD_SEGMENT
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
- int64_t segment_id, SegmentStatistics&
segment_stat) {
+ int64_t segment_id, const
SegmentStatistics& segment_stat) {
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 60a48427618..2db9ffcd950 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -161,7 +161,7 @@ public:
// ADD_SEGMENT
Status add_segment(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
- int64_t segment_id, SegmentStatistics& segment_stat);
+ int64_t segment_id, const SegmentStatistics&
segment_stat);
// CLOSE_LOAD
Status close_load(const std::vector<PTabletID>& tablets_to_commit);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]