This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e68c2b9b9 fix(duplication): fix the dangling pointer after `blob` 
object is moved (#2088)
e68c2b9b9 is described below

commit e68c2b9b95082d265f99f2dd2ff04148513d847e
Author: Dan Wang <[email protected]>
AuthorDate: Thu Aug 15 11:02:23 2024 +0800

    fix(duplication): fix the dangling pointer after `blob` object is moved 
(#2088)
    
    https://github.com/apache/incubator-pegasus/issues/2089
    
    Refactor `blob` object to avoid the dangling pointer leading to 
heap-use-after-free
    error which happened in #2089 after `blob` object is moved. Also refactor 
and add
    some tests for `blob` and `mutation_batch`.
---
 .clang-tidy                                        |   2 +-
 build_tools/clang_tidy.py                          |   2 +-
 src/replica/duplication/mutation_batch.cpp         |  18 ++-
 .../duplication/test/duplication_test_base.h       |   7 +-
 .../test/load_from_private_log_test.cpp            |  13 +-
 .../duplication/test/mutation_batch_test.cpp       | 160 +++++++++++++--------
 src/replica/mutation_log_replay.cpp                |  70 +++++----
 src/replica/test/log_block_test.cpp                |   6 +-
 src/replica/test/log_file_test.cpp                 |   3 +-
 src/replica/test/mutation_log_test.cpp             |  10 +-
 src/replica/test/replica_test_base.h               |   8 +-
 src/runtime/rpc/network.sim.cpp                    |   2 +-
 src/utils/binary_reader.cpp                        |   4 +-
 src/utils/binary_reader.h                          |   8 +-
 src/utils/blob.h                                   | 120 +++++++++++-----
 src/utils/test/blob_test.cpp                       | 126 ++++++++++++++++
 16 files changed, 397 insertions(+), 162 deletions(-)

diff --git a/.clang-tidy b/.clang-tidy
index a2a2c95c1..2e072d7d8 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -18,7 +18,7 @@
 # 
https://releases.llvm.org/14.0.0/tools/clang/tools/extra/docs/clang-tidy/index.html
 
 CheckOptions: []
-Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*'
+Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-
 [...]
 ExtraArgs:
 ExtraArgsBefore: []
 FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index 407149765..d4c8964a0 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -60,7 +60,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
                    "clang-tidy",
                    "-p0",
                    "-path", BUILD_PATH,
-                   
"-checks=-cppcoreguidelines-pro-type-union-access,-modernize-use-trailing-return-type",
+                   
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-private-member-variables-in-classes,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-hicpp-no-array-decay,-hicpp-named-parameter,-read
 [...]
                    "-extra-arg=-language=c++",
                    "-extra-arg=-std=c++17",
                    "-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/replica/duplication/mutation_batch.cpp 
b/src/replica/duplication/mutation_batch.cpp
index 9a6493cb9..efcd1eeac 100644
--- a/src/replica/duplication/mutation_batch.cpp
+++ b/src/replica/duplication/mutation_batch.cpp
@@ -17,23 +17,24 @@
 
 #include <functional>
 #include <string>
+#include <string_view>
 #include <tuple>
 #include <utility>
 #include <vector>
 
-#include <string_view>
 #include "common/replication.codes.h"
 #include "consensus_types.h"
 #include "metadata_types.h"
 #include "mutation_batch.h"
-#include "replica_duplicator.h"
 #include "replica/replica.h"
+#include "replica_duplicator.h"
 #include "runtime/task/task_code.h"
 #include "runtime/task/task_spec.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
 #include "utils/error_code.h"
 #include "utils/fmt_logging.h"
+#include "utils/ports.h"
 
 METRIC_DEFINE_gauge_int64(replica,
                           dup_recent_lost_mutations,
@@ -196,10 +197,19 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr 
&mu, decree start_decree
         }
 
         blob bb;
-        if (update.data.buffer() != nullptr) {
+        if (update.data.buffer()) {
+            // ATTENTION: instead of copy, move could optimize the 
performance. However, this
+            // would nullify the elements of mu->data.updates.
             bb = std::move(update.data);
         } else {
-            bb = blob::create_from_bytes(update.data.data(), 
update.data.length());
+            // TODO(wangdan): if update.data.buffer() is nullptr, the blob 
object must have
+            // been used as `string_view`.
+            //
+            // Once `string_view` function is removed from blob, consider 
dropping following
+            // statements.
+            if (dsn_likely(update.data.data() != nullptr && 
!update.data.empty())) {
+                bb = blob::create_from_bytes(update.data.data(), 
update.data.length());
+            }
         }
 
         _total_bytes += bb.length();
diff --git a/src/replica/duplication/test/duplication_test_base.h 
b/src/replica/duplication/test/duplication_test_base.h
index d4dacf9b9..49101341e 100644
--- a/src/replica/duplication/test/duplication_test_base.h
+++ b/src/replica/duplication/test/duplication_test_base.h
@@ -75,16 +75,15 @@ public:
         return log_file_map;
     }
 
-    mutation_ptr create_test_mutation(int64_t decree,
-                                      int64_t last_committed_decree,
-                                      const std::string &data) override
+    mutation_ptr
+    create_test_mutation(int64_t decree, int64_t last_committed_decree, const 
char *data) override
     {
         auto mut = replica_test_base::create_test_mutation(decree, 
last_committed_decree, data);
         mut->data.updates[0].code = RPC_DUPLICATION_IDEMPOTENT_WRITE; // must 
be idempotent write
         return mut;
     }
 
-    mutation_ptr create_test_mutation(int64_t decree, const std::string &data) 
override
+    mutation_ptr create_test_mutation(int64_t decree, const char *data) 
override
     {
         return duplication_test_base::create_test_mutation(decree, decree - 1, 
data);
     }
diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp 
b/src/replica/duplication/test/load_from_private_log_test.cpp
index 0d7141745..f2eaf253b 100644
--- a/src/replica/duplication/test/load_from_private_log_test.cpp
+++ b/src/replica/duplication/test/load_from_private_log_test.cpp
@@ -92,8 +92,7 @@ public:
             // each round mlog will replay the former logs, and create new file
             mutation_log_ptr mlog = create_private_log();
             for (int i = 1; i <= 10; i++) {
-                std::string msg = "hello!";
-                mutation_ptr mu = create_test_mutation(10 * f + i, msg);
+                auto mu = create_test_mutation(10 * f + i, "hello!");
                 mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 
0);
             }
             mlog->tracker()->wait_outstanding_tasks();
@@ -149,9 +148,8 @@ public:
             auto reserved_plog_force_flush = FLAGS_plog_force_flush;
             FLAGS_plog_force_flush = true;
             for (int i = decree_start; i <= num_entries + decree_start; i++) {
-                std::string msg = "hello!";
                 //  decree - last_commit_decree  = 1 by default
-                mutation_ptr mu = create_test_mutation(i, msg);
+                auto mu = create_test_mutation(i, "hello!");
                 // mock the last_commit_decree of first mu equal with 
`last_commit_decree_start`
                 if (i == decree_start) {
                     mu->data.header.last_committed_decree = 
last_commit_decree_start;
@@ -160,7 +158,7 @@ public:
             }
 
             // commit the last entry
-            mutation_ptr mu = create_test_mutation(decree_start + num_entries 
+ 1, "hello!");
+            auto mu = create_test_mutation(decree_start + num_entries + 1, 
"hello!");
             mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
             FLAGS_plog_force_flush = reserved_plog_force_flush;
 
@@ -362,13 +360,12 @@ TEST_P(load_from_private_log_test, ignore_useless)
 
     int num_entries = 100;
     for (int i = 1; i <= num_entries; i++) {
-        std::string msg = "hello!";
-        mutation_ptr mu = create_test_mutation(i, msg);
+        auto mu = create_test_mutation(i, "hello!");
         mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
     }
 
     // commit the last entry
-    mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!");
+    auto mu = create_test_mutation(1 + num_entries, "hello!");
     mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
     mlog->close();
 
diff --git a/src/replica/duplication/test/mutation_batch_test.cpp 
b/src/replica/duplication/test/mutation_batch_test.cpp
index dd5c27738..c865de20b 100644
--- a/src/replica/duplication/test/mutation_batch_test.cpp
+++ b/src/replica/duplication/test/mutation_batch_test.cpp
@@ -31,138 +31,178 @@
 #include "gtest/gtest.h"
 #include "replica/duplication/mutation_batch.h"
 #include "replica/duplication/mutation_duplicator.h"
+#include "replica/duplication/replica_duplicator.h"
 #include "replica/mutation.h"
 #include "replica/prepare_list.h"
 #include "runtime/task/task_code.h"
 #include "utils/autoref_ptr.h"
+#include "utils/blob.h"
 
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
 
 class mutation_batch_test : public duplication_test_base
 {
 public:
-    void reset_buffer(const decree last_commit,
-                      const decree start,
-                      const decree end,
-                      mutation_batch &batcher)
+    mutation_batch_test() : _duplicator(create_test_duplicator(0)), 
_batcher(_duplicator.get()) {}
+
+    void reset_buffer(const decree last_commit, const decree start, const 
decree end)
     {
-        batcher._mutation_buffer->reset(last_commit);
-        batcher._mutation_buffer->_start_decree = start;
-        batcher._mutation_buffer->_end_decree = end;
+        _batcher._mutation_buffer->reset(last_commit);
+        _batcher._mutation_buffer->_start_decree = start;
+        _batcher._mutation_buffer->_end_decree = end;
     }
 
-    void commit_buffer(const decree current_decree, mutation_batch &batcher)
+    void commit_buffer(const decree current_decree)
     {
-        batcher._mutation_buffer->commit(current_decree, 
COMMIT_TO_DECREE_HARD);
+        _batcher._mutation_buffer->commit(current_decree, 
COMMIT_TO_DECREE_HARD);
     }
 
-    void check_mutation_contents(const std::set<std::string> 
&expected_mutations,
-                                 mutation_batch &batcher)
+    void check_mutation_contents(const std::vector<std::string> 
&expected_mutations)
     {
-        const auto all_mutations = batcher.move_all_mutations();
+        const auto all_mutations = _batcher.move_all_mutations();
 
-        std::set<std::string> actual_mutations;
+        std::vector<std::string> actual_mutations;
         std::transform(all_mutations.begin(),
                        all_mutations.end(),
-                       std::inserter(actual_mutations, actual_mutations.end()),
+                       std::back_inserter(actual_mutations),
                        [](const mutation_tuple &tuple) { return 
std::get<2>(tuple).to_string(); });
 
         ASSERT_EQ(expected_mutations, actual_mutations);
     }
+
+    std::unique_ptr<replica_duplicator> _duplicator;
+    mutation_batch _batcher;
 };
 
 INSTANTIATE_TEST_SUITE_P(, mutation_batch_test, ::testing::Values(false, 
true));
 
 TEST_P(mutation_batch_test, prepare_mutation)
 {
-    auto duplicator = create_test_duplicator(0);
-    mutation_batch batcher(duplicator.get());
-
     auto mu1 = create_test_mutation(1, 0, "first mutation");
     set_last_applied_decree(1);
-    ASSERT_TRUE(batcher.add(mu1));
-    ASSERT_EQ(1, batcher.last_decree());
+    ASSERT_TRUE(_batcher.add(mu1));
+    ASSERT_EQ(1, _batcher.last_decree());
 
     auto mu2 = create_test_mutation(2, 1, "abcde");
     set_last_applied_decree(2);
-    ASSERT_TRUE(batcher.add(mu2));
-    ASSERT_EQ(2, batcher.last_decree());
+    ASSERT_TRUE(_batcher.add(mu2));
+    ASSERT_EQ(2, _batcher.last_decree());
 
     auto mu3 = create_test_mutation(3, 2, "hello world");
-    ASSERT_TRUE(batcher.add(mu3));
+    ASSERT_TRUE(_batcher.add(mu3));
 
     // The last decree has not been updated.
-    ASSERT_EQ(2, batcher.last_decree());
+    ASSERT_EQ(2, _batcher.last_decree());
 
     auto mu4 = create_test_mutation(4, 2, "foo bar");
-    ASSERT_TRUE(batcher.add(mu4));
-    ASSERT_EQ(2, batcher.last_decree());
+    ASSERT_TRUE(_batcher.add(mu4));
+    ASSERT_EQ(2, _batcher.last_decree());
 
     // The committed mutation would be ignored.
     auto mu2_another = create_test_mutation(2, 1, "another second mutation");
-    ASSERT_TRUE(batcher.add(mu2_another));
-    ASSERT_EQ(2, batcher.last_decree());
+    ASSERT_TRUE(_batcher.add(mu2_another));
+    ASSERT_EQ(2, _batcher.last_decree());
 
     // The mutation with duplicate decree would be ignored.
     auto mu3_another = create_test_mutation(3, 2, "123 xyz");
-    ASSERT_TRUE(batcher.add(mu3_another));
-    ASSERT_EQ(2, batcher.last_decree());
+    ASSERT_TRUE(_batcher.add(mu3_another));
+    ASSERT_EQ(2, _batcher.last_decree());
 
     auto mu5 = create_test_mutation(5, 2, "5th mutation");
     set_last_applied_decree(5);
-    ASSERT_TRUE(batcher.add(mu5));
-    ASSERT_EQ(5, batcher.last_decree());
+    ASSERT_TRUE(_batcher.add(mu5));
+    ASSERT_EQ(5, _batcher.last_decree());
+
+    check_mutation_contents({"first mutation", "abcde", "hello world", "foo 
bar", "5th mutation"});
+}
+
+TEST_P(mutation_batch_test, add_null_mutation)
+{
+    auto mu = create_test_mutation(1, nullptr);
+    _batcher.add_mutation_if_valid(mu, 0);
+
+    check_mutation_contents({""});
+}
+
+TEST_P(mutation_batch_test, add_empty_mutation)
+{
+    auto mu = create_test_mutation(1, "");
+    _batcher.add_mutation_if_valid(mu, 0);
+
+    check_mutation_contents({""});
+}
+
+// TODO(wangdan): once `string_view` function is removed from blob, drop this 
test.
+TEST_P(mutation_batch_test, add_string_view_mutation)
+{
+    auto mu = create_test_mutation(1, nullptr);
+    const std::string data("hello");
+    mu->data.updates.back().data = blob(data.data(), 0, data.size());
+    _batcher.add_mutation_if_valid(mu, 0);
 
-    check_mutation_contents({"first mutation", "abcde", "hello world", "foo 
bar", "5th mutation"},
-                            batcher);
+    check_mutation_contents({"hello"});
 }
 
-TEST_P(mutation_batch_test, add_mutation_if_valid)
+TEST_P(mutation_batch_test, add_a_valid_mutation)
 {
-    auto duplicator = create_test_duplicator(0);
-    mutation_batch batcher(duplicator.get());
+    auto mu = create_test_mutation(1, "hello");
+    _batcher.add_mutation_if_valid(mu, 0);
+
+    check_mutation_contents({"hello"});
+}
 
+TEST_P(mutation_batch_test, add_multiple_valid_mutations)
+{
+    // The mutation could not be reused, since in 
mutation_batch::add_mutation_if_valid
+    // the elements of mutation::data::updates would be moved and nullified.
     auto mu1 = create_test_mutation(1, "hello");
-    batcher.add_mutation_if_valid(mu1, 0);
-    check_mutation_contents({"hello"}, batcher);
+    _batcher.add_mutation_if_valid(mu1, 0);
 
     auto mu2 = create_test_mutation(2, "world");
-    batcher.add_mutation_if_valid(mu2, 0);
-    check_mutation_contents({"world"}, batcher);
+    _batcher.add_mutation_if_valid(mu2, 2);
+
+    auto mu3 = create_test_mutation(3, "hi");
+    _batcher.add_mutation_if_valid(mu3, 2);
+
+    check_mutation_contents({"hello", "world", "hi"});
+}
+
+TEST_P(mutation_batch_test, add_invalid_mutation)
+{
+    auto mu2 = create_test_mutation(2, "world");
+    _batcher.add_mutation_if_valid(mu2, 2);
 
     // mu1 would be ignored, since its decree is less than the start decree.
-    batcher.add_mutation_if_valid(mu1, 2);
-    batcher.add_mutation_if_valid(mu2, 2);
+    auto mu1 = create_test_mutation(1, "hello");
+    _batcher.add_mutation_if_valid(mu1, 2);
+
+    auto mu3 = create_test_mutation(3, "hi");
+    _batcher.add_mutation_if_valid(mu3, 2);
 
-    auto mu3 = create_test_mutation(1, "hi");
-    batcher.add_mutation_if_valid(mu3, 1);
-    check_mutation_contents({"hi", "world"}, batcher);
+    auto mu4 = create_test_mutation(1, "ok");
+    _batcher.add_mutation_if_valid(mu4, 1);
+
+    // "ok" would be the first, since its timestamp (i.e. decree in 
create_test_mutation)
+    // is the smallest.
+    check_mutation_contents({"ok", "world", "hi"});
 }
 
 TEST_P(mutation_batch_test, ignore_non_idempotent_write)
 {
-    auto duplicator = create_test_duplicator(0);
-    mutation_batch batcher(duplicator.get());
-
     auto mu = create_test_mutation(1, "hello");
     mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
-    batcher.add_mutation_if_valid(mu, 0);
-    check_mutation_contents({}, batcher);
+    _batcher.add_mutation_if_valid(mu, 0);
+    check_mutation_contents({});
 }
 
 TEST_P(mutation_batch_test, mutation_buffer_commit)
 {
-    auto duplicator = create_test_duplicator(0);
-    mutation_batch batcher(duplicator.get());
-
     // Mock mutation_buffer[last=10, start=15, end=20], last + 1(next commit 
decree) is out of
     // [start~end], then last would become min_decree() - 1, see 
mutation_buffer::commit() for
     // details.
-    reset_buffer(10, 15, 20, batcher);
-    commit_buffer(15, batcher);
-    ASSERT_EQ(14, batcher.last_decree());
+    reset_buffer(10, 15, 20);
+    commit_buffer(15);
+    ASSERT_EQ(14, _batcher.last_decree());
 }
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/replica/mutation_log_replay.cpp 
b/src/replica/mutation_log_replay.cpp
index ee891bcb7..0e7956c8d 100644
--- a/src/replica/mutation_log_replay.cpp
+++ b/src/replica/mutation_log_replay.cpp
@@ -22,6 +22,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <string_view>
 #include <utility>
 #include <vector>
 
@@ -38,10 +39,9 @@
 #include "utils/errors.h"
 #include "utils/fail_point.h"
 #include "utils/fmt_logging.h"
-#include <string_view>
+#include "utils/ports.h"
 
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
 
 /*static*/ error_code mutation_log::replay(log_file_ptr log,
                                            replay_callback callback,
@@ -72,40 +72,55 @@ namespace replication {
     return err.code();
 }
 
-/*static*/ error_s mutation_log::replay_block(log_file_ptr &log,
-                                              replay_callback &callback,
-                                              size_t start_offset,
-                                              int64_t &end_offset)
-{
-    FAIL_POINT_INJECT_F("mutation_log_replay_block", [](std::string_view) -> 
error_s {
-        return error_s::make(ERR_INCOMPLETE_DATA, "mutation_log_replay_block");
-    });
-
-    blob bb;
-    std::unique_ptr<binary_reader> reader;
+namespace {
 
-    log->reset_stream(start_offset); // start reading from given offset
-    int64_t global_start_offset = start_offset + log->start_offset();
-    end_offset = global_start_offset; // reset end_offset to the start.
-
-    // reads the entire block into memory
-    error_code err = log->read_next_log_block(bb);
-    if (err != ERR_OK) {
-        return error_s::make(err, "failed to read log block");
+dsn::error_s read_block(dsn::replication::log_file_ptr &log,
+                        size_t start_offset,
+                        int64_t &end_offset,
+                        std::unique_ptr<dsn::binary_reader> &reader)
+{
+    log->reset_stream(start_offset); // Start reading from given offset.
+    int64_t global_start_offset = static_cast<int64_t>(start_offset) + 
log->start_offset();
+    end_offset = global_start_offset; // Reset end_offset to the start.
+
+    {
+        // Read the entire block into memory.
+        blob bb;
+        const auto err = log->read_next_log_block(bb);
+        if (dsn_unlikely(err != dsn::ERR_OK)) {
+            return FMT_ERR(err, "failed to read log block");
+        }
+        reader = std::make_unique<dsn::binary_reader>(std::move(bb));
     }
 
-    reader = std::make_unique<binary_reader>(bb);
-    end_offset += sizeof(log_block_header);
+    end_offset += sizeof(dsn::replication::log_block_header);
 
     // The first block is log_file_header.
     if (global_start_offset == log->start_offset()) {
         end_offset += log->read_file_header(*reader);
         if (!log->is_right_header()) {
-            return error_s::make(ERR_INVALID_DATA, "failed to read log file 
header");
+            return FMT_ERR(dsn::ERR_INVALID_DATA, "failed to read log file 
header");
         }
-        // continue to parsing the data block
+        // Continue to parsing the data block.
     }
 
+    return dsn::error_s::ok();
+}
+
+} // anonymous namespace
+
+/*static*/ error_s mutation_log::replay_block(log_file_ptr &log,
+                                              replay_callback &callback,
+                                              size_t start_offset,
+                                              int64_t &end_offset)
+{
+    FAIL_POINT_INJECT_F("mutation_log_replay_block", [](std::string_view) -> 
error_s {
+        return error_s::make(ERR_INCOMPLETE_DATA, "mutation_log_replay_block");
+    });
+
+    std::unique_ptr<binary_reader> reader;
+    RETURN_NOT_OK(read_block(log, start_offset, end_offset, reader));
+
     while (!reader->is_eof()) {
         auto old_size = reader->get_remaining_size();
         mutation_ptr mu = mutation::read_from(*reader, nullptr);
@@ -218,5 +233,4 @@ namespace replication {
     return err;
 }
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/replica/test/log_block_test.cpp 
b/src/replica/test/log_block_test.cpp
index e667a59ce..7499e13c8 100644
--- a/src/replica/test/log_block_test.cpp
+++ b/src/replica/test/log_block_test.cpp
@@ -112,7 +112,8 @@ TEST_P(log_appender_test, log_block_full)
 {
     log_appender appender(10);
     for (int i = 0; i < 1024; i++) { // more than DEFAULT_MAX_BLOCK_BYTES
-        appender.append_mutation(create_test_mutation(1 + i, std::string(1024, 
'a')), nullptr);
+        appender.append_mutation(create_test_mutation(1 + i, std::string(1024, 
'a').c_str()),
+                                 nullptr);
     }
     ASSERT_EQ(appender.mutations().size(), 1024);
     // two log_block_header blobs
@@ -136,7 +137,8 @@ TEST_P(log_appender_test, read_log_block)
 {
     log_appender appender(10);
     for (int i = 0; i < 1024; i++) { // more than DEFAULT_MAX_BLOCK_BYTES
-        appender.append_mutation(create_test_mutation(1 + i, std::string(1024, 
'a')), nullptr);
+        appender.append_mutation(create_test_mutation(1 + i, std::string(1024, 
'a').c_str()),
+                                 nullptr);
     }
     ASSERT_EQ(appender.all_blocks().size(), 2);
 
diff --git a/src/replica/test/log_file_test.cpp 
b/src/replica/test/log_file_test.cpp
index 3ac902451..c4abb02b2 100644
--- a/src/replica/test/log_file_test.cpp
+++ b/src/replica/test/log_file_test.cpp
@@ -81,7 +81,8 @@ TEST_P(log_file_test, commit_log_blocks)
     size_t written_sz = appender->size();
     appender = std::make_shared<log_appender>(_start_offset + written_sz);
     for (int i = 0; i < 1024; i++) { // more than DEFAULT_MAX_BLOCK_BYTES
-        appender->append_mutation(create_test_mutation(1 + i, 
std::string(1024, 'a')), nullptr);
+        appender->append_mutation(create_test_mutation(1 + i, 
std::string(1024, 'a').c_str()),
+                                  nullptr);
     }
     ASSERT_GT(appender->all_blocks().size(), 1);
     tsk = _logf->commit_log_blocks(
diff --git a/src/replica/test/mutation_log_test.cpp 
b/src/replica/test/mutation_log_test.cpp
index 0009e800e..6d92e042b 100644
--- a/src/replica/test/mutation_log_test.cpp
+++ b/src/replica/test/mutation_log_test.cpp
@@ -291,9 +291,8 @@ public:
 
     void TearDown() override { utils::filesystem::remove_path(_log_dir); }
 
-    mutation_ptr create_test_mutation(int64_t decree,
-                                      int64_t last_committed_decree,
-                                      const std::string &data) override
+    mutation_ptr
+    create_test_mutation(int64_t decree, int64_t last_committed_decree, const 
char *data) override
     {
         mutation_ptr mu(new mutation());
         mu->data.header.ballot = 1;
@@ -315,7 +314,7 @@ public:
         return mu;
     }
 
-    mutation_ptr create_test_mutation(int64_t decree, const std::string &data) 
override
+    mutation_ptr create_test_mutation(int64_t decree, const char *data) 
override
     {
         return mutation_log_test::create_test_mutation(decree, decree - 1, 
data);
     }
@@ -333,8 +332,7 @@ public:
             // each round mlog will replay the former logs, and create new file
             mutation_log_ptr mlog = create_private_log();
             for (int i = 1; i <= 10; i++) {
-                std::string msg = "hello!";
-                mutation_ptr mu = create_test_mutation(10 * f + i, msg);
+                auto mu = create_test_mutation(10 * f + i, "hello!");
                 mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 
0);
             }
             mlog->tracker()->wait_outstanding_tasks();
diff --git a/src/replica/test/replica_test_base.h 
b/src/replica/test/replica_test_base.h
index 0fe479d8a..374e87e82 100644
--- a/src/replica/test/replica_test_base.h
+++ b/src/replica/test/replica_test_base.h
@@ -62,7 +62,7 @@ public:
     }
 
     virtual mutation_ptr
-    create_test_mutation(int64_t decree, int64_t last_committed_decree, const 
std::string &data)
+    create_test_mutation(int64_t decree, int64_t last_committed_decree, const 
char *data)
     {
         mutation_ptr mu(new mutation());
         mu->data.header.ballot = 1;
@@ -75,7 +75,9 @@ public:
         mu->data.updates.emplace_back(mutation_update());
         mu->data.updates.back().code =
             RPC_COLD_BACKUP; // whatever code it is, but never be WRITE_EMPTY
-        mu->data.updates.back().data = 
blob::create_from_bytes(std::string(data));
+        if (data != nullptr) {
+            mu->data.updates.back().data = blob::create_from_bytes(data);
+        }
         mu->client_requests.push_back(nullptr);
 
         // replica_duplicator always loads from hard disk,
@@ -85,7 +87,7 @@ public:
         return mu;
     }
 
-    virtual mutation_ptr create_test_mutation(int64_t decree, const 
std::string &data)
+    virtual mutation_ptr create_test_mutation(int64_t decree, const char *data)
     {
         return replica_test_base::create_test_mutation(decree, decree - 1, 
data);
     }
diff --git a/src/runtime/rpc/network.sim.cpp b/src/runtime/rpc/network.sim.cpp
index 595f86cb1..ecde4fb17 100644
--- a/src/runtime/rpc/network.sim.cpp
+++ b/src/runtime/rpc/network.sim.cpp
@@ -83,7 +83,7 @@ static message_ex *virtual_send_message(message_ex *msg)
         tmp += buf.length();
     }
 
-    blob bb(buffer, 0, msg->header->body_length + sizeof(message_header));
+    blob bb(buffer, msg->header->body_length + sizeof(message_header));
     message_ex *recv_msg = message_ex::create_receive_message(bb);
     recv_msg->to_address = msg->to_address;
     recv_msg->to_host_port = msg->to_host_port;
diff --git a/src/utils/binary_reader.cpp b/src/utils/binary_reader.cpp
index 227a98ed3..64e1e3a78 100644
--- a/src/utils/binary_reader.cpp
+++ b/src/utils/binary_reader.cpp
@@ -102,10 +102,10 @@ int binary_reader::inner_read(blob &blob, int len)
         blob = _blob.range(static_cast<int>(_ptr - _blob.data()), len);
 
         // optimization: zero-copy
-        if (!blob.buffer_ptr()) {
+        if (!blob.buffer()) {
             std::shared_ptr<char> 
buffer(::dsn::utils::make_shared_array<char>(len));
             memcpy(buffer.get(), blob.data(), blob.length());
-            blob = ::dsn::blob(buffer, 0, blob.length());
+            blob = ::dsn::blob(buffer, blob.length());
         }
 
         _ptr += len;
diff --git a/src/utils/binary_reader.h b/src/utils/binary_reader.h
index 65d0749fb..415da5593 100644
--- a/src/utils/binary_reader.h
+++ b/src/utils/binary_reader.h
@@ -39,13 +39,13 @@ class binary_reader
 {
 public:
     // given bb on ctor
-    binary_reader(const blob &blob);
-    binary_reader(blob &&blob);
+    explicit binary_reader(const blob &blob);
+    explicit binary_reader(blob &&blob);
 
     // or delayed init
-    binary_reader() {}
+    binary_reader() = default;
 
-    virtual ~binary_reader() {}
+    virtual ~binary_reader() = default;
 
     void init(const blob &bb);
     void init(blob &&bb);
diff --git a/src/utils/blob.h b/src/utils/blob.h
index 3cdbce320..62bc0a8a5 100644
--- a/src/utils/blob.h
+++ b/src/utils/blob.h
@@ -33,6 +33,7 @@
 #include <thrift/protocol/TBinaryProtocol.h>
 #include <thrift/protocol/TProtocol.h>
 
+#include "utils/fmt_logging.h"
 #include "utils/fmt_utils.h"
 #include "utils.h"
 
@@ -51,14 +52,6 @@ public:
     {
     }
 
-    blob(std::shared_ptr<char> buffer, int offset, unsigned int length)
-        : _holder(std::move(buffer)),
-          _buffer(_holder.get()),
-          _data(_holder.get() + offset),
-          _length(length)
-    {
-    }
-
     /// NOTE: Use std::string_view whenever possible.
     /// blob is designed for shared buffer, never use it as constant view.
     /// Maybe we could deprecate this function in the future.
@@ -67,21 +60,68 @@ public:
     {
     }
 
+    blob(const blob &rhs) noexcept = default;
+
+    blob &operator=(const blob &rhs) noexcept
+    {
+        if (this == &rhs) {
+            return *this;
+        }
+
+        _holder = rhs._holder;
+        _buffer = rhs._buffer;
+        _data = rhs._data;
+        _length = rhs._length;
+
+        return *this;
+    }
+
+    blob(blob &&rhs) noexcept
+        : _holder(std::move(rhs._holder)),
+          _buffer(rhs._buffer),
+          _data(rhs._data),
+          _length(rhs._length)
+    {
+        rhs._buffer = nullptr;
+        rhs._data = nullptr;
+        rhs._length = 0;
+    }
+
+    blob &operator=(blob &&rhs) noexcept
+    {
+        if (this == &rhs) {
+            return *this;
+        }
+
+        _holder = std::move(rhs._holder);
+        _buffer = rhs._buffer;
+        _data = rhs._data;
+        _length = rhs._length;
+
+        rhs._buffer = nullptr;
+        rhs._data = nullptr;
+        rhs._length = 0;
+
+        return *this;
+    }
+
     /// Create shared buffer from allocated raw bytes.
     /// NOTE: this operation is not efficient since it involves a memory copy.
-    static blob create_from_bytes(const char *s, size_t len)
+    [[nodiscard]] static blob create_from_bytes(const char *s, size_t len)
     {
+        CHECK_NOTNULL(s, "null source pointer would lead to undefined 
behaviour");
+
         std::shared_ptr<char> s_arr(new char[len], 
std::default_delete<char[]>());
         memcpy(s_arr.get(), s, len);
-        return blob(std::move(s_arr), 0, static_cast<unsigned int>(len));
+        return {std::move(s_arr), static_cast<unsigned int>(len)};
     }
 
     /// Create shared buffer without copying data.
-    static blob create_from_bytes(std::string &&bytes)
+    [[nodiscard]] static blob create_from_bytes(std::string &&bytes)
     {
-        auto s = new std::string(std::move(bytes));
+        auto *s = new std::string(std::move(bytes));
         std::shared_ptr<char> buf(const_cast<char *>(s->data()), [s](char *) { 
delete s; });
-        return blob(std::move(buf), 0, static_cast<unsigned int>(s->length()));
+        return {std::move(buf), static_cast<unsigned int>(s->length())};
     }
 
     void assign(const std::shared_ptr<char> &buffer, int offset, unsigned int 
length)
@@ -95,8 +135,8 @@ public:
     void assign(std::shared_ptr<char> &&buffer, int offset, unsigned int 
length)
     {
         _holder = std::move(buffer);
-        _buffer = (_holder.get());
-        _data = (_holder.get() + offset);
+        _buffer = _holder.get();
+        _data = _holder.get() + offset;
         _length = length;
     }
 
@@ -109,21 +149,22 @@ public:
         _length = length;
     }
 
-    const char *data() const noexcept { return _data; }
+    [[nodiscard]] const char *data() const noexcept { return _data; }
 
-    unsigned int length() const noexcept { return _length; }
-    unsigned int size() const noexcept { return _length; }
-    bool empty() const noexcept { return _length == 0; }
+    [[nodiscard]] unsigned int length() const noexcept { return _length; }
+    [[nodiscard]] unsigned int size() const noexcept { return _length; }
+    [[nodiscard]] bool empty() const noexcept { return _length == 0; }
 
-    std::shared_ptr<char> buffer() const { return _holder; }
+    [[nodiscard]] std::shared_ptr<char> buffer() const { return _holder; }
 
-    const char *buffer_ptr() const { return _holder.get(); }
+    [[nodiscard]] const char *buffer_ptr() const { return _holder.get(); }
 
-    // offset can be negative for buffer dereference
-    blob range(int offset) const
+    // `offset` can be negative for buffer dereference.
+    [[nodiscard]] blob range(int offset) const
     {
-        // offset cannot exceed the current length value
-        assert(offset <= static_cast<int>(_length));
+        DCHECK_LE_MSG(offset,
+                      static_cast<int>(_length),
+                      "the required offset cannot exceed the current length");
 
         blob temp = *this;
         temp._data += offset;
@@ -131,33 +172,38 @@ public:
         return temp;
     }
 
-    blob range(int offset, unsigned int len) const
+    [[nodiscard]] blob range(int offset, unsigned int len) const
     {
-        // offset cannot exceed the current length value
-        assert(offset <= static_cast<int>(_length));
+        DCHECK_LE_MSG(offset,
+                      static_cast<int>(_length),
+                      "the required offset cannot exceed the current length");
 
         blob temp = *this;
         temp._data += offset;
         temp._length -= offset;
 
-        // buffer length must exceed the required length
-        assert(temp._length >= len);
+        DCHECK_LE_MSG(
+            len, temp._length, "the required length cannot exceed remaining 
buffer length");
+
         temp._length = len;
         return temp;
     }
 
-    bool operator==(const blob &r) const
+    // Could NOT be declared with "= delete", since many thrift-generated 
classes would
+    // access this in their own `operator==`.
+    bool operator==(const blob &) const
     {
-        // not implemented
-        assert(false);
+        CHECK(false, "not implemented");
         return false;
     }
 
-    std::string to_string() const
+    [[nodiscard]] std::string to_string() const
     {
-        if (_length == 0)
+        if (_length == 0) {
             return {};
-        return std::string(_data, _length);
+        }
+
+        return {_data, _length};
     }
 
     friend std::ostream &operator<<(std::ostream &os, const blob &bb)
@@ -165,7 +211,7 @@ public:
         return os << bb.to_string();
     }
 
-    std::string_view to_string_view() const { return std::string_view(_data, 
_length); }
+    [[nodiscard]] std::string_view to_string_view() const { return {_data, 
_length}; }
 
     uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
     uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
diff --git a/src/utils/test/blob_test.cpp b/src/utils/test/blob_test.cpp
new file mode 100644
index 000000000..8ae2c8b7f
--- /dev/null
+++ b/src/utils/test/blob_test.cpp
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "utils/blob.h"
+
+namespace dsn {
+
+struct blob_base_case
+{
+    std::string expected_str;
+};
+
+class BlobBaseTest : public testing::TestWithParam<blob_base_case>
+{
+public:
+    void SetUp() override
+    {
+        const auto &test_case = GetParam();
+        _expected_str = test_case.expected_str;
+    }
+
+    void check_blob_value(const blob &obj) const
+    {
+        EXPECT_EQ(_expected_str, obj.to_string());
+
+        EXPECT_EQ(_expected_str.size(), obj.length());
+        EXPECT_EQ(_expected_str.size(), obj.size());
+
+        if (_expected_str.empty()) {
+            EXPECT_TRUE(obj.empty());
+        } else {
+            EXPECT_FALSE(obj.empty());
+        }
+    }
+
+protected:
+    std::string _expected_str;
+};
+
+const std::vector<blob_base_case> blob_base_tests = {
+    // Test empty case.
+    {""},
+    // Test non-empty case.
+    {"hello"},
+};
+
+class BlobCreateTest : public BlobBaseTest
+{
+};
+
+TEST_P(BlobCreateTest, CreateFromCString)
+{
+    const auto &obj = blob::create_from_bytes(_expected_str.data(), 
_expected_str.size());
+    check_blob_value(obj);
+}
+
+TEST_P(BlobCreateTest, CreateFromString)
+{
+    const auto &obj = blob::create_from_bytes(std::string(_expected_str));
+    check_blob_value(obj);
+}
+
+INSTANTIATE_TEST_SUITE_P(BlobTest, BlobCreateTest, 
testing::ValuesIn(blob_base_tests));
+
+class BlobInitTest : public BlobBaseTest
+{
+public:
+    blob create() { return 
blob::create_from_bytes(std::string(_expected_str)); }
+};
+
+TEST_P(BlobInitTest, CopyConstructor)
+{
+    const auto &obj = create();
+
+    blob copy(obj);
+    check_blob_value(copy);
+}
+
+TEST_P(BlobInitTest, CopyAssignment)
+{
+    const auto &obj = create();
+
+    blob copy;
+    copy = obj;
+    check_blob_value(copy);
+}
+
+TEST_P(BlobInitTest, MoveConstructor)
+{
+    auto obj = create();
+
+    blob move(std::move(obj));
+    check_blob_value(move);
+}
+
+TEST_P(BlobInitTest, MoveAssignment)
+{
+    auto obj = create();
+
+    blob move;
+    move = std::move(obj);
+    check_blob_value(move);
+}
+
+INSTANTIATE_TEST_SUITE_P(BlobTest, BlobInitTest, 
testing::ValuesIn(blob_base_tests));
+
+} // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to