This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 519b48648e0 [fix](move-memtable) handle status when possible (#26526)
519b48648e0 is described below
commit 519b48648e0bd637267de0945268ed3766bb5732
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Nov 8 10:09:06 2023 +0800
[fix](move-memtable) handle status when possible (#26526)
---
be/src/olap/delta_writer_v2.cpp | 10 +++++-----
be/src/olap/delta_writer_v2.h | 5 ++---
be/src/olap/rowset/beta_rowset_writer_v2.cpp | 2 +-
be/src/vec/sink/delta_writer_v2_pool.cpp | 4 ++--
be/src/vec/sink/delta_writer_v2_pool.h | 3 ++-
be/src/vec/sink/vtablet_sink_v2.cpp | 10 ++++------
be/test/vec/exec/delta_writer_v2_pool_test.cpp | 18 +++---------------
7 files changed, 19 insertions(+), 33 deletions(-)
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 8e13f8f3050..52e22d84b69 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -64,11 +64,11 @@
namespace doris {
using namespace ErrorCode;
-Status DeltaWriterV2::open(WriteRequest* req,
- const std::vector<std::shared_ptr<LoadStreamStub>>&
streams,
- DeltaWriterV2** writer) {
- *writer = new DeltaWriterV2(req, streams, StorageEngine::instance());
- return Status::OK();
+std::unique_ptr<DeltaWriterV2> DeltaWriterV2::open(
+ WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>&
streams) {
+ std::unique_ptr<DeltaWriterV2> writer(
+ new DeltaWriterV2(req, streams, StorageEngine::instance()));
+ return writer;
}
DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 61ad63d0a9b..8a102c5706d 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -63,9 +63,8 @@ class Block;
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriterV2 {
public:
- static Status open(WriteRequest* req,
- const std::vector<std::shared_ptr<LoadStreamStub>>&
streams,
- DeltaWriterV2** writer);
+ static std::unique_ptr<DeltaWriterV2> open(
+ WriteRequest* req, const
std::vector<std::shared_ptr<LoadStreamStub>>& streams);
~DeltaWriterV2();
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index e16ce5fa9fa..d201ba4044e 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -72,7 +72,7 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext&
rowset_writer_context
_context = rowset_writer_context;
_context.segment_collector =
std::make_shared<SegmentCollectorT<BetaRowsetWriterV2>>(this);
_context.file_writer_creator =
std::make_shared<FileWriterCreatorT<BetaRowsetWriterV2>>(this);
- static_cast<void>(_segment_creator.init(_context));
+ RETURN_IF_ERROR(_segment_creator.init(_context));
return Status::OK();
}
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index e714e6b1bbd..b9057136d97 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -29,8 +29,8 @@ DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) :
_load_id(load_id), _use_c
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
-DeltaWriterV2* DeltaWriterV2Map::get_or_create(int64_t tablet_id,
- std::function<DeltaWriterV2*()>
creator) {
+DeltaWriterV2* DeltaWriterV2Map::get_or_create(
+ int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()>
creator) {
_map.lazy_emplace(tablet_id, [&](const
TabletToDeltaWriterV2Map::constructor& ctor) {
ctor(tablet_id, creator());
});
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h
b/be/src/vec/sink/delta_writer_v2_pool.h
index 414d34da671..8439062440a 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -67,7 +67,8 @@ public:
void grab() { ++_use_cnt; }
// get or create delta writer for the given tablet, memory is managed by
DeltaWriterV2Map
- DeltaWriterV2* get_or_create(int64_t tablet_id,
std::function<DeltaWriterV2*()> creator);
+ DeltaWriterV2* get_or_create(int64_t tablet_id,
+
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
// close all delta writers in this DeltaWriterV2Map if there is no other
users
Status close(RuntimeProfile* profile);
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index eba1d08acc0..696a66ac3fb 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -351,9 +351,7 @@ Status
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
break;
}
}
- DeltaWriterV2* delta_writer = nullptr;
- static_cast<void>(DeltaWriterV2::open(&req, streams, &delta_writer));
- return delta_writer;
+ return DeltaWriterV2::open(&req, streams);
});
{
SCOPED_TIMER(_wait_mem_limit_timer);
@@ -396,7 +394,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status
exec_status) {
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
- static_cast<void>(_delta_writer_for_tablet->close(_profile));
+ RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
_delta_writer_for_tablet.reset();
}
@@ -444,11 +442,11 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
LOG(INFO) << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
<< ", txn_id=" << _txn_id;
} else {
- static_cast<void>(_cancel(status));
+ RETURN_IF_ERROR(_cancel(status));
}
_close_status = status;
- static_cast<void>(DataSink::close(state, exec_status));
+ RETURN_IF_ERROR(DataSink::close(state, exec_status));
return status;
}
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index e68555ed68e..30b56b65d11 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -56,21 +56,9 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
auto map = pool.get_or_create(load_id);
EXPECT_EQ(1, pool.size());
WriteRequest req;
- auto writer = map->get_or_create(100, [&req]() {
- DeltaWriterV2* writer;
- static_cast<void>(DeltaWriterV2::open(&req, {}, &writer));
- return writer;
- });
- auto writer2 = map->get_or_create(101, [&req]() {
- DeltaWriterV2* writer;
- static_cast<void>(DeltaWriterV2::open(&req, {}, &writer));
- return writer;
- });
- auto writer3 = map->get_or_create(100, [&req]() {
- DeltaWriterV2* writer;
- static_cast<void>(DeltaWriterV2::open(&req, {}, &writer));
- return writer;
- });
+ auto writer = map->get_or_create(100, [&req]() { return
DeltaWriterV2::open(&req, {}); });
+ auto writer2 = map->get_or_create(101, [&req]() { return
DeltaWriterV2::open(&req, {}); });
+ auto writer3 = map->get_or_create(100, [&req]() { return
DeltaWriterV2::open(&req, {}); });
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]