This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 519b48648e0 [fix](move-memtable) handle status when possible (#26526)
519b48648e0 is described below

commit 519b48648e0bd637267de0945268ed3766bb5732
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Nov 8 10:09:06 2023 +0800

    [fix](move-memtable) handle status when possible (#26526)
---
 be/src/olap/delta_writer_v2.cpp                | 10 +++++-----
 be/src/olap/delta_writer_v2.h                  |  5 ++---
 be/src/olap/rowset/beta_rowset_writer_v2.cpp   |  2 +-
 be/src/vec/sink/delta_writer_v2_pool.cpp       |  4 ++--
 be/src/vec/sink/delta_writer_v2_pool.h         |  3 ++-
 be/src/vec/sink/vtablet_sink_v2.cpp            | 10 ++++------
 be/test/vec/exec/delta_writer_v2_pool_test.cpp | 18 +++---------------
 7 files changed, 19 insertions(+), 33 deletions(-)

diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 8e13f8f3050..52e22d84b69 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -64,11 +64,11 @@
 namespace doris {
 using namespace ErrorCode;
 
-Status DeltaWriterV2::open(WriteRequest* req,
-                           const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams,
-                           DeltaWriterV2** writer) {
-    *writer = new DeltaWriterV2(req, streams, StorageEngine::instance());
-    return Status::OK();
+std::unique_ptr<DeltaWriterV2> DeltaWriterV2::open(
+        WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams) {
+    std::unique_ptr<DeltaWriterV2> writer(
+            new DeltaWriterV2(req, streams, StorageEngine::instance()));
+    return writer;
 }
 
 DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 61ad63d0a9b..8a102c5706d 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -63,9 +63,8 @@ class Block;
 // This class is NOT thread-safe, external synchronization is required.
 class DeltaWriterV2 {
 public:
-    static Status open(WriteRequest* req,
-                       const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams,
-                       DeltaWriterV2** writer);
+    static std::unique_ptr<DeltaWriterV2> open(
+            WriteRequest* req, const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams);
 
     ~DeltaWriterV2();
 
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp 
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index e16ce5fa9fa..d201ba4044e 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -72,7 +72,7 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext& 
rowset_writer_context
     _context = rowset_writer_context;
     _context.segment_collector = 
std::make_shared<SegmentCollectorT<BetaRowsetWriterV2>>(this);
     _context.file_writer_creator = 
std::make_shared<FileWriterCreatorT<BetaRowsetWriterV2>>(this);
-    static_cast<void>(_segment_creator.init(_context));
+    RETURN_IF_ERROR(_segment_creator.init(_context));
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index e714e6b1bbd..b9057136d97 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -29,8 +29,8 @@ DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : 
_load_id(load_id), _use_c
 
 DeltaWriterV2Map::~DeltaWriterV2Map() = default;
 
-DeltaWriterV2* DeltaWriterV2Map::get_or_create(int64_t tablet_id,
-                                               std::function<DeltaWriterV2*()> 
creator) {
+DeltaWriterV2* DeltaWriterV2Map::get_or_create(
+        int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> 
creator) {
     _map.lazy_emplace(tablet_id, [&](const 
TabletToDeltaWriterV2Map::constructor& ctor) {
         ctor(tablet_id, creator());
     });
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h 
b/be/src/vec/sink/delta_writer_v2_pool.h
index 414d34da671..8439062440a 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -67,7 +67,8 @@ public:
     void grab() { ++_use_cnt; }
 
     // get or create delta writer for the given tablet, memory is managed by 
DeltaWriterV2Map
-    DeltaWriterV2* get_or_create(int64_t tablet_id, 
std::function<DeltaWriterV2*()> creator);
+    DeltaWriterV2* get_or_create(int64_t tablet_id,
+                                 
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
 
     // close all delta writers in this DeltaWriterV2Map if there is no other 
users
     Status close(RuntimeProfile* profile);
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index eba1d08acc0..696a66ac3fb 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -351,9 +351,7 @@ Status 
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
                 break;
             }
         }
-        DeltaWriterV2* delta_writer = nullptr;
-        static_cast<void>(DeltaWriterV2::open(&req, streams, &delta_writer));
-        return delta_writer;
+        return DeltaWriterV2::open(&req, streams);
     });
     {
         SCOPED_TIMER(_wait_mem_limit_timer);
@@ -396,7 +394,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status 
exec_status) {
         {
             SCOPED_TIMER(_close_writer_timer);
             // close all delta writers if this is the last user
-            static_cast<void>(_delta_writer_for_tablet->close(_profile));
+            RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
             _delta_writer_for_tablet.reset();
         }
 
@@ -444,11 +442,11 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
         LOG(INFO) << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
                   << ", txn_id=" << _txn_id;
     } else {
-        static_cast<void>(_cancel(status));
+        RETURN_IF_ERROR(_cancel(status));
     }
 
     _close_status = status;
-    static_cast<void>(DataSink::close(state, exec_status));
+    RETURN_IF_ERROR(DataSink::close(state, exec_status));
     return status;
 }
 
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp 
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index e68555ed68e..30b56b65d11 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -56,21 +56,9 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
     auto map = pool.get_or_create(load_id);
     EXPECT_EQ(1, pool.size());
     WriteRequest req;
-    auto writer = map->get_or_create(100, [&req]() {
-        DeltaWriterV2* writer;
-        static_cast<void>(DeltaWriterV2::open(&req, {}, &writer));
-        return writer;
-    });
-    auto writer2 = map->get_or_create(101, [&req]() {
-        DeltaWriterV2* writer;
-        static_cast<void>(DeltaWriterV2::open(&req, {}, &writer));
-        return writer;
-    });
-    auto writer3 = map->get_or_create(100, [&req]() {
-        DeltaWriterV2* writer;
-        static_cast<void>(DeltaWriterV2::open(&req, {}, &writer));
-        return writer;
-    });
+    auto writer = map->get_or_create(100, [&req]() { return 
DeltaWriterV2::open(&req, {}); });
+    auto writer2 = map->get_or_create(101, [&req]() { return 
DeltaWriterV2::open(&req, {}); });
+    auto writer3 = map->get_or_create(100, [&req]() { return 
DeltaWriterV2::open(&req, {}); });
     EXPECT_EQ(2, map->size());
     EXPECT_EQ(writer, writer3);
     EXPECT_NE(writer, writer2);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to