This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new e68c2b9b9 fix(duplication): fix the dangling pointer after `blob`
object is moved (#2088)
e68c2b9b9 is described below
commit e68c2b9b95082d265f99f2dd2ff04148513d847e
Author: Dan Wang <[email protected]>
AuthorDate: Thu Aug 15 11:02:23 2024 +0800
fix(duplication): fix the dangling pointer after `blob` object is moved
(#2088)
https://github.com/apache/incubator-pegasus/issues/2089
Refactor `blob` object to avoid the dangling pointer leading to
heap-use-after-free
error which happened in #2089 after `blob` object is moved. Also refactor
and add
some tests for `blob` and `mutation_batch`.
---
.clang-tidy | 2 +-
build_tools/clang_tidy.py | 2 +-
src/replica/duplication/mutation_batch.cpp | 18 ++-
.../duplication/test/duplication_test_base.h | 7 +-
.../test/load_from_private_log_test.cpp | 13 +-
.../duplication/test/mutation_batch_test.cpp | 160 +++++++++++++--------
src/replica/mutation_log_replay.cpp | 70 +++++----
src/replica/test/log_block_test.cpp | 6 +-
src/replica/test/log_file_test.cpp | 3 +-
src/replica/test/mutation_log_test.cpp | 10 +-
src/replica/test/replica_test_base.h | 8 +-
src/runtime/rpc/network.sim.cpp | 2 +-
src/utils/binary_reader.cpp | 4 +-
src/utils/binary_reader.h | 8 +-
src/utils/blob.h | 120 +++++++++++-----
src/utils/test/blob_test.cpp | 126 ++++++++++++++++
16 files changed, 397 insertions(+), 162 deletions(-)
diff --git a/.clang-tidy b/.clang-tidy
index a2a2c95c1..2e072d7d8 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -18,7 +18,7 @@
#
https://releases.llvm.org/14.0.0/tools/clang/tools/extra/docs/clang-tidy/index.html
CheckOptions: []
-Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*'
+Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-
[...]
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index 407149765..d4c8964a0 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -60,7 +60,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
"clang-tidy",
"-p0",
"-path", BUILD_PATH,
-
"-checks=-cppcoreguidelines-pro-type-union-access,-modernize-use-trailing-return-type",
+
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-private-member-variables-in-classes,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-hicpp-no-array-decay,-hicpp-named-parameter,-read
[...]
"-extra-arg=-language=c++",
"-extra-arg=-std=c++17",
"-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/replica/duplication/mutation_batch.cpp
b/src/replica/duplication/mutation_batch.cpp
index 9a6493cb9..efcd1eeac 100644
--- a/src/replica/duplication/mutation_batch.cpp
+++ b/src/replica/duplication/mutation_batch.cpp
@@ -17,23 +17,24 @@
#include <functional>
#include <string>
+#include <string_view>
#include <tuple>
#include <utility>
#include <vector>
-#include <string_view>
#include "common/replication.codes.h"
#include "consensus_types.h"
#include "metadata_types.h"
#include "mutation_batch.h"
-#include "replica_duplicator.h"
#include "replica/replica.h"
+#include "replica_duplicator.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
+#include "utils/ports.h"
METRIC_DEFINE_gauge_int64(replica,
dup_recent_lost_mutations,
@@ -196,10 +197,19 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr
&mu, decree start_decree
}
blob bb;
- if (update.data.buffer() != nullptr) {
+ if (update.data.buffer()) {
+ // ATTENTION: instead of copy, move could optimize the
performance. However, this
+ // would nullify the elements of mu->data.updates.
bb = std::move(update.data);
} else {
- bb = blob::create_from_bytes(update.data.data(),
update.data.length());
+ // TODO(wangdan): if update.data.buffer() is nullptr, the blob
object must have
+ // been used as `string_view`.
+ //
+ // Once `string_view` function is removed from blob, consider
dropping following
+ // statements.
+ if (dsn_likely(update.data.data() != nullptr &&
!update.data.empty())) {
+ bb = blob::create_from_bytes(update.data.data(),
update.data.length());
+ }
}
_total_bytes += bb.length();
diff --git a/src/replica/duplication/test/duplication_test_base.h
b/src/replica/duplication/test/duplication_test_base.h
index d4dacf9b9..49101341e 100644
--- a/src/replica/duplication/test/duplication_test_base.h
+++ b/src/replica/duplication/test/duplication_test_base.h
@@ -75,16 +75,15 @@ public:
return log_file_map;
}
- mutation_ptr create_test_mutation(int64_t decree,
- int64_t last_committed_decree,
- const std::string &data) override
+ mutation_ptr
+ create_test_mutation(int64_t decree, int64_t last_committed_decree, const
char *data) override
{
auto mut = replica_test_base::create_test_mutation(decree,
last_committed_decree, data);
mut->data.updates[0].code = RPC_DUPLICATION_IDEMPOTENT_WRITE; // must
be idempotent write
return mut;
}
- mutation_ptr create_test_mutation(int64_t decree, const std::string &data)
override
+ mutation_ptr create_test_mutation(int64_t decree, const char *data)
override
{
return duplication_test_base::create_test_mutation(decree, decree - 1,
data);
}
diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp
b/src/replica/duplication/test/load_from_private_log_test.cpp
index 0d7141745..f2eaf253b 100644
--- a/src/replica/duplication/test/load_from_private_log_test.cpp
+++ b/src/replica/duplication/test/load_from_private_log_test.cpp
@@ -92,8 +92,7 @@ public:
// each round mlog will replay the former logs, and create new file
mutation_log_ptr mlog = create_private_log();
for (int i = 1; i <= 10; i++) {
- std::string msg = "hello!";
- mutation_ptr mu = create_test_mutation(10 * f + i, msg);
+ auto mu = create_test_mutation(10 * f + i, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr,
0);
}
mlog->tracker()->wait_outstanding_tasks();
@@ -149,9 +148,8 @@ public:
auto reserved_plog_force_flush = FLAGS_plog_force_flush;
FLAGS_plog_force_flush = true;
for (int i = decree_start; i <= num_entries + decree_start; i++) {
- std::string msg = "hello!";
// decree - last_commit_decree = 1 by default
- mutation_ptr mu = create_test_mutation(i, msg);
+ auto mu = create_test_mutation(i, "hello!");
// mock the last_commit_decree of first mu equal with
`last_commit_decree_start`
if (i == decree_start) {
mu->data.header.last_committed_decree =
last_commit_decree_start;
@@ -160,7 +158,7 @@ public:
}
// commit the last entry
- mutation_ptr mu = create_test_mutation(decree_start + num_entries
+ 1, "hello!");
+ auto mu = create_test_mutation(decree_start + num_entries + 1,
"hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
FLAGS_plog_force_flush = reserved_plog_force_flush;
@@ -362,13 +360,12 @@ TEST_P(load_from_private_log_test, ignore_useless)
int num_entries = 100;
for (int i = 1; i <= num_entries; i++) {
- std::string msg = "hello!";
- mutation_ptr mu = create_test_mutation(i, msg);
+ auto mu = create_test_mutation(i, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}
// commit the last entry
- mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!");
+ auto mu = create_test_mutation(1 + num_entries, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
mlog->close();
diff --git a/src/replica/duplication/test/mutation_batch_test.cpp
b/src/replica/duplication/test/mutation_batch_test.cpp
index dd5c27738..c865de20b 100644
--- a/src/replica/duplication/test/mutation_batch_test.cpp
+++ b/src/replica/duplication/test/mutation_batch_test.cpp
@@ -31,138 +31,178 @@
#include "gtest/gtest.h"
#include "replica/duplication/mutation_batch.h"
#include "replica/duplication/mutation_duplicator.h"
+#include "replica/duplication/replica_duplicator.h"
#include "replica/mutation.h"
#include "replica/prepare_list.h"
#include "runtime/task/task_code.h"
#include "utils/autoref_ptr.h"
+#include "utils/blob.h"
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
class mutation_batch_test : public duplication_test_base
{
public:
- void reset_buffer(const decree last_commit,
- const decree start,
- const decree end,
- mutation_batch &batcher)
+ mutation_batch_test() : _duplicator(create_test_duplicator(0)),
_batcher(_duplicator.get()) {}
+
+ void reset_buffer(const decree last_commit, const decree start, const
decree end)
{
- batcher._mutation_buffer->reset(last_commit);
- batcher._mutation_buffer->_start_decree = start;
- batcher._mutation_buffer->_end_decree = end;
+ _batcher._mutation_buffer->reset(last_commit);
+ _batcher._mutation_buffer->_start_decree = start;
+ _batcher._mutation_buffer->_end_decree = end;
}
- void commit_buffer(const decree current_decree, mutation_batch &batcher)
+ void commit_buffer(const decree current_decree)
{
- batcher._mutation_buffer->commit(current_decree,
COMMIT_TO_DECREE_HARD);
+ _batcher._mutation_buffer->commit(current_decree,
COMMIT_TO_DECREE_HARD);
}
- void check_mutation_contents(const std::set<std::string>
&expected_mutations,
- mutation_batch &batcher)
+ void check_mutation_contents(const std::vector<std::string>
&expected_mutations)
{
- const auto all_mutations = batcher.move_all_mutations();
+ const auto all_mutations = _batcher.move_all_mutations();
- std::set<std::string> actual_mutations;
+ std::vector<std::string> actual_mutations;
std::transform(all_mutations.begin(),
all_mutations.end(),
- std::inserter(actual_mutations, actual_mutations.end()),
+ std::back_inserter(actual_mutations),
[](const mutation_tuple &tuple) { return
std::get<2>(tuple).to_string(); });
ASSERT_EQ(expected_mutations, actual_mutations);
}
+
+ std::unique_ptr<replica_duplicator> _duplicator;
+ mutation_batch _batcher;
};
INSTANTIATE_TEST_SUITE_P(, mutation_batch_test, ::testing::Values(false,
true));
TEST_P(mutation_batch_test, prepare_mutation)
{
- auto duplicator = create_test_duplicator(0);
- mutation_batch batcher(duplicator.get());
-
auto mu1 = create_test_mutation(1, 0, "first mutation");
set_last_applied_decree(1);
- ASSERT_TRUE(batcher.add(mu1));
- ASSERT_EQ(1, batcher.last_decree());
+ ASSERT_TRUE(_batcher.add(mu1));
+ ASSERT_EQ(1, _batcher.last_decree());
auto mu2 = create_test_mutation(2, 1, "abcde");
set_last_applied_decree(2);
- ASSERT_TRUE(batcher.add(mu2));
- ASSERT_EQ(2, batcher.last_decree());
+ ASSERT_TRUE(_batcher.add(mu2));
+ ASSERT_EQ(2, _batcher.last_decree());
auto mu3 = create_test_mutation(3, 2, "hello world");
- ASSERT_TRUE(batcher.add(mu3));
+ ASSERT_TRUE(_batcher.add(mu3));
// The last decree has not been updated.
- ASSERT_EQ(2, batcher.last_decree());
+ ASSERT_EQ(2, _batcher.last_decree());
auto mu4 = create_test_mutation(4, 2, "foo bar");
- ASSERT_TRUE(batcher.add(mu4));
- ASSERT_EQ(2, batcher.last_decree());
+ ASSERT_TRUE(_batcher.add(mu4));
+ ASSERT_EQ(2, _batcher.last_decree());
// The committed mutation would be ignored.
auto mu2_another = create_test_mutation(2, 1, "another second mutation");
- ASSERT_TRUE(batcher.add(mu2_another));
- ASSERT_EQ(2, batcher.last_decree());
+ ASSERT_TRUE(_batcher.add(mu2_another));
+ ASSERT_EQ(2, _batcher.last_decree());
// The mutation with duplicate decree would be ignored.
auto mu3_another = create_test_mutation(3, 2, "123 xyz");
- ASSERT_TRUE(batcher.add(mu3_another));
- ASSERT_EQ(2, batcher.last_decree());
+ ASSERT_TRUE(_batcher.add(mu3_another));
+ ASSERT_EQ(2, _batcher.last_decree());
auto mu5 = create_test_mutation(5, 2, "5th mutation");
set_last_applied_decree(5);
- ASSERT_TRUE(batcher.add(mu5));
- ASSERT_EQ(5, batcher.last_decree());
+ ASSERT_TRUE(_batcher.add(mu5));
+ ASSERT_EQ(5, _batcher.last_decree());
+
+ check_mutation_contents({"first mutation", "abcde", "hello world", "foo
bar", "5th mutation"});
+}
+
+TEST_P(mutation_batch_test, add_null_mutation)
+{
+ auto mu = create_test_mutation(1, nullptr);
+ _batcher.add_mutation_if_valid(mu, 0);
+
+ check_mutation_contents({""});
+}
+
+TEST_P(mutation_batch_test, add_empty_mutation)
+{
+ auto mu = create_test_mutation(1, "");
+ _batcher.add_mutation_if_valid(mu, 0);
+
+ check_mutation_contents({""});
+}
+
+// TODO(wangdan): once `string_view` function is removed from blob, drop this
test.
+TEST_P(mutation_batch_test, add_string_view_mutation)
+{
+ auto mu = create_test_mutation(1, nullptr);
+ const std::string data("hello");
+ mu->data.updates.back().data = blob(data.data(), 0, data.size());
+ _batcher.add_mutation_if_valid(mu, 0);
- check_mutation_contents({"first mutation", "abcde", "hello world", "foo
bar", "5th mutation"},
- batcher);
+ check_mutation_contents({"hello"});
}
-TEST_P(mutation_batch_test, add_mutation_if_valid)
+TEST_P(mutation_batch_test, add_a_valid_mutation)
{
- auto duplicator = create_test_duplicator(0);
- mutation_batch batcher(duplicator.get());
+ auto mu = create_test_mutation(1, "hello");
+ _batcher.add_mutation_if_valid(mu, 0);
+
+ check_mutation_contents({"hello"});
+}
+TEST_P(mutation_batch_test, add_multiple_valid_mutations)
+{
+ // The mutation could not be reused, since in
mutation_batch::add_mutation_if_valid
+ // the elements of mutation::data::updates would be moved and nullified.
auto mu1 = create_test_mutation(1, "hello");
- batcher.add_mutation_if_valid(mu1, 0);
- check_mutation_contents({"hello"}, batcher);
+ _batcher.add_mutation_if_valid(mu1, 0);
auto mu2 = create_test_mutation(2, "world");
- batcher.add_mutation_if_valid(mu2, 0);
- check_mutation_contents({"world"}, batcher);
+ _batcher.add_mutation_if_valid(mu2, 2);
+
+ auto mu3 = create_test_mutation(3, "hi");
+ _batcher.add_mutation_if_valid(mu3, 2);
+
+ check_mutation_contents({"hello", "world", "hi"});
+}
+
+TEST_P(mutation_batch_test, add_invalid_mutation)
+{
+ auto mu2 = create_test_mutation(2, "world");
+ _batcher.add_mutation_if_valid(mu2, 2);
// mu1 would be ignored, since its decree is less than the start decree.
- batcher.add_mutation_if_valid(mu1, 2);
- batcher.add_mutation_if_valid(mu2, 2);
+ auto mu1 = create_test_mutation(1, "hello");
+ _batcher.add_mutation_if_valid(mu1, 2);
+
+ auto mu3 = create_test_mutation(3, "hi");
+ _batcher.add_mutation_if_valid(mu3, 2);
- auto mu3 = create_test_mutation(1, "hi");
- batcher.add_mutation_if_valid(mu3, 1);
- check_mutation_contents({"hi", "world"}, batcher);
+ auto mu4 = create_test_mutation(1, "ok");
+ _batcher.add_mutation_if_valid(mu4, 1);
+
+ // "ok" would be the first, since its timestamp (i.e. decree in
create_test_mutation)
+ // is the smallest.
+ check_mutation_contents({"ok", "world", "hi"});
}
TEST_P(mutation_batch_test, ignore_non_idempotent_write)
{
- auto duplicator = create_test_duplicator(0);
- mutation_batch batcher(duplicator.get());
-
auto mu = create_test_mutation(1, "hello");
mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
- batcher.add_mutation_if_valid(mu, 0);
- check_mutation_contents({}, batcher);
+ _batcher.add_mutation_if_valid(mu, 0);
+ check_mutation_contents({});
}
TEST_P(mutation_batch_test, mutation_buffer_commit)
{
- auto duplicator = create_test_duplicator(0);
- mutation_batch batcher(duplicator.get());
-
// Mock mutation_buffer[last=10, start=15, end=20], last + 1(next commit
decree) is out of
// [start~end], then last would become min_decree() - 1, see
mutation_buffer::commit() for
// details.
- reset_buffer(10, 15, 20, batcher);
- commit_buffer(15, batcher);
- ASSERT_EQ(14, batcher.last_decree());
+ reset_buffer(10, 15, 20);
+ commit_buffer(15);
+ ASSERT_EQ(14, _batcher.last_decree());
}
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/replica/mutation_log_replay.cpp
b/src/replica/mutation_log_replay.cpp
index ee891bcb7..0e7956c8d 100644
--- a/src/replica/mutation_log_replay.cpp
+++ b/src/replica/mutation_log_replay.cpp
@@ -22,6 +22,7 @@
#include <map>
#include <memory>
#include <string>
+#include <string_view>
#include <utility>
#include <vector>
@@ -38,10 +39,9 @@
#include "utils/errors.h"
#include "utils/fail_point.h"
#include "utils/fmt_logging.h"
-#include <string_view>
+#include "utils/ports.h"
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
/*static*/ error_code mutation_log::replay(log_file_ptr log,
replay_callback callback,
@@ -72,40 +72,55 @@ namespace replication {
return err.code();
}
-/*static*/ error_s mutation_log::replay_block(log_file_ptr &log,
- replay_callback &callback,
- size_t start_offset,
- int64_t &end_offset)
-{
- FAIL_POINT_INJECT_F("mutation_log_replay_block", [](std::string_view) ->
error_s {
- return error_s::make(ERR_INCOMPLETE_DATA, "mutation_log_replay_block");
- });
-
- blob bb;
- std::unique_ptr<binary_reader> reader;
+namespace {
- log->reset_stream(start_offset); // start reading from given offset
- int64_t global_start_offset = start_offset + log->start_offset();
- end_offset = global_start_offset; // reset end_offset to the start.
-
- // reads the entire block into memory
- error_code err = log->read_next_log_block(bb);
- if (err != ERR_OK) {
- return error_s::make(err, "failed to read log block");
+dsn::error_s read_block(dsn::replication::log_file_ptr &log,
+ size_t start_offset,
+ int64_t &end_offset,
+ std::unique_ptr<dsn::binary_reader> &reader)
+{
+ log->reset_stream(start_offset); // Start reading from given offset.
+ int64_t global_start_offset = static_cast<int64_t>(start_offset) +
log->start_offset();
+ end_offset = global_start_offset; // Reset end_offset to the start.
+
+ {
+ // Read the entire block into memory.
+ blob bb;
+ const auto err = log->read_next_log_block(bb);
+ if (dsn_unlikely(err != dsn::ERR_OK)) {
+ return FMT_ERR(err, "failed to read log block");
+ }
+ reader = std::make_unique<dsn::binary_reader>(std::move(bb));
}
- reader = std::make_unique<binary_reader>(bb);
- end_offset += sizeof(log_block_header);
+ end_offset += sizeof(dsn::replication::log_block_header);
// The first block is log_file_header.
if (global_start_offset == log->start_offset()) {
end_offset += log->read_file_header(*reader);
if (!log->is_right_header()) {
- return error_s::make(ERR_INVALID_DATA, "failed to read log file
header");
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "failed to read log file
header");
}
- // continue to parsing the data block
+ // Continue to parsing the data block.
}
+ return dsn::error_s::ok();
+}
+
+} // anonymous namespace
+
+/*static*/ error_s mutation_log::replay_block(log_file_ptr &log,
+ replay_callback &callback,
+ size_t start_offset,
+ int64_t &end_offset)
+{
+ FAIL_POINT_INJECT_F("mutation_log_replay_block", [](std::string_view) ->
error_s {
+ return error_s::make(ERR_INCOMPLETE_DATA, "mutation_log_replay_block");
+ });
+
+ std::unique_ptr<binary_reader> reader;
+ RETURN_NOT_OK(read_block(log, start_offset, end_offset, reader));
+
while (!reader->is_eof()) {
auto old_size = reader->get_remaining_size();
mutation_ptr mu = mutation::read_from(*reader, nullptr);
@@ -218,5 +233,4 @@ namespace replication {
return err;
}
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/replica/test/log_block_test.cpp
b/src/replica/test/log_block_test.cpp
index e667a59ce..7499e13c8 100644
--- a/src/replica/test/log_block_test.cpp
+++ b/src/replica/test/log_block_test.cpp
@@ -112,7 +112,8 @@ TEST_P(log_appender_test, log_block_full)
{
log_appender appender(10);
for (int i = 0; i < 1024; i++) { // more than DEFAULT_MAX_BLOCK_BYTES
- appender.append_mutation(create_test_mutation(1 + i, std::string(1024,
'a')), nullptr);
+ appender.append_mutation(create_test_mutation(1 + i, std::string(1024,
'a').c_str()),
+ nullptr);
}
ASSERT_EQ(appender.mutations().size(), 1024);
// two log_block_header blobs
@@ -136,7 +137,8 @@ TEST_P(log_appender_test, read_log_block)
{
log_appender appender(10);
for (int i = 0; i < 1024; i++) { // more than DEFAULT_MAX_BLOCK_BYTES
- appender.append_mutation(create_test_mutation(1 + i, std::string(1024,
'a')), nullptr);
+ appender.append_mutation(create_test_mutation(1 + i, std::string(1024,
'a').c_str()),
+ nullptr);
}
ASSERT_EQ(appender.all_blocks().size(), 2);
diff --git a/src/replica/test/log_file_test.cpp
b/src/replica/test/log_file_test.cpp
index 3ac902451..c4abb02b2 100644
--- a/src/replica/test/log_file_test.cpp
+++ b/src/replica/test/log_file_test.cpp
@@ -81,7 +81,8 @@ TEST_P(log_file_test, commit_log_blocks)
size_t written_sz = appender->size();
appender = std::make_shared<log_appender>(_start_offset + written_sz);
for (int i = 0; i < 1024; i++) { // more than DEFAULT_MAX_BLOCK_BYTES
- appender->append_mutation(create_test_mutation(1 + i,
std::string(1024, 'a')), nullptr);
+ appender->append_mutation(create_test_mutation(1 + i,
std::string(1024, 'a').c_str()),
+ nullptr);
}
ASSERT_GT(appender->all_blocks().size(), 1);
tsk = _logf->commit_log_blocks(
diff --git a/src/replica/test/mutation_log_test.cpp
b/src/replica/test/mutation_log_test.cpp
index 0009e800e..6d92e042b 100644
--- a/src/replica/test/mutation_log_test.cpp
+++ b/src/replica/test/mutation_log_test.cpp
@@ -291,9 +291,8 @@ public:
void TearDown() override { utils::filesystem::remove_path(_log_dir); }
- mutation_ptr create_test_mutation(int64_t decree,
- int64_t last_committed_decree,
- const std::string &data) override
+ mutation_ptr
+ create_test_mutation(int64_t decree, int64_t last_committed_decree, const
char *data) override
{
mutation_ptr mu(new mutation());
mu->data.header.ballot = 1;
@@ -315,7 +314,7 @@ public:
return mu;
}
- mutation_ptr create_test_mutation(int64_t decree, const std::string &data)
override
+ mutation_ptr create_test_mutation(int64_t decree, const char *data)
override
{
return mutation_log_test::create_test_mutation(decree, decree - 1,
data);
}
@@ -333,8 +332,7 @@ public:
// each round mlog will replay the former logs, and create new file
mutation_log_ptr mlog = create_private_log();
for (int i = 1; i <= 10; i++) {
- std::string msg = "hello!";
- mutation_ptr mu = create_test_mutation(10 * f + i, msg);
+ auto mu = create_test_mutation(10 * f + i, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr,
0);
}
mlog->tracker()->wait_outstanding_tasks();
diff --git a/src/replica/test/replica_test_base.h
b/src/replica/test/replica_test_base.h
index 0fe479d8a..374e87e82 100644
--- a/src/replica/test/replica_test_base.h
+++ b/src/replica/test/replica_test_base.h
@@ -62,7 +62,7 @@ public:
}
virtual mutation_ptr
- create_test_mutation(int64_t decree, int64_t last_committed_decree, const
std::string &data)
+ create_test_mutation(int64_t decree, int64_t last_committed_decree, const
char *data)
{
mutation_ptr mu(new mutation());
mu->data.header.ballot = 1;
@@ -75,7 +75,9 @@ public:
mu->data.updates.emplace_back(mutation_update());
mu->data.updates.back().code =
RPC_COLD_BACKUP; // whatever code it is, but never be WRITE_EMPTY
- mu->data.updates.back().data =
blob::create_from_bytes(std::string(data));
+ if (data != nullptr) {
+ mu->data.updates.back().data = blob::create_from_bytes(data);
+ }
mu->client_requests.push_back(nullptr);
// replica_duplicator always loads from hard disk,
@@ -85,7 +87,7 @@ public:
return mu;
}
- virtual mutation_ptr create_test_mutation(int64_t decree, const
std::string &data)
+ virtual mutation_ptr create_test_mutation(int64_t decree, const char *data)
{
return replica_test_base::create_test_mutation(decree, decree - 1,
data);
}
diff --git a/src/runtime/rpc/network.sim.cpp b/src/runtime/rpc/network.sim.cpp
index 595f86cb1..ecde4fb17 100644
--- a/src/runtime/rpc/network.sim.cpp
+++ b/src/runtime/rpc/network.sim.cpp
@@ -83,7 +83,7 @@ static message_ex *virtual_send_message(message_ex *msg)
tmp += buf.length();
}
- blob bb(buffer, 0, msg->header->body_length + sizeof(message_header));
+ blob bb(buffer, msg->header->body_length + sizeof(message_header));
message_ex *recv_msg = message_ex::create_receive_message(bb);
recv_msg->to_address = msg->to_address;
recv_msg->to_host_port = msg->to_host_port;
diff --git a/src/utils/binary_reader.cpp b/src/utils/binary_reader.cpp
index 227a98ed3..64e1e3a78 100644
--- a/src/utils/binary_reader.cpp
+++ b/src/utils/binary_reader.cpp
@@ -102,10 +102,10 @@ int binary_reader::inner_read(blob &blob, int len)
blob = _blob.range(static_cast<int>(_ptr - _blob.data()), len);
// optimization: zero-copy
- if (!blob.buffer_ptr()) {
+ if (!blob.buffer()) {
std::shared_ptr<char>
buffer(::dsn::utils::make_shared_array<char>(len));
memcpy(buffer.get(), blob.data(), blob.length());
- blob = ::dsn::blob(buffer, 0, blob.length());
+ blob = ::dsn::blob(buffer, blob.length());
}
_ptr += len;
diff --git a/src/utils/binary_reader.h b/src/utils/binary_reader.h
index 65d0749fb..415da5593 100644
--- a/src/utils/binary_reader.h
+++ b/src/utils/binary_reader.h
@@ -39,13 +39,13 @@ class binary_reader
{
public:
// given bb on ctor
- binary_reader(const blob &blob);
- binary_reader(blob &&blob);
+ explicit binary_reader(const blob &blob);
+ explicit binary_reader(blob &&blob);
// or delayed init
- binary_reader() {}
+ binary_reader() = default;
- virtual ~binary_reader() {}
+ virtual ~binary_reader() = default;
void init(const blob &bb);
void init(blob &&bb);
diff --git a/src/utils/blob.h b/src/utils/blob.h
index 3cdbce320..62bc0a8a5 100644
--- a/src/utils/blob.h
+++ b/src/utils/blob.h
@@ -33,6 +33,7 @@
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TProtocol.h>
+#include "utils/fmt_logging.h"
#include "utils/fmt_utils.h"
#include "utils.h"
@@ -51,14 +52,6 @@ public:
{
}
- blob(std::shared_ptr<char> buffer, int offset, unsigned int length)
- : _holder(std::move(buffer)),
- _buffer(_holder.get()),
- _data(_holder.get() + offset),
- _length(length)
- {
- }
-
/// NOTE: Use std::string_view whenever possible.
/// blob is designed for shared buffer, never use it as constant view.
/// Maybe we could deprecate this function in the future.
@@ -67,21 +60,68 @@ public:
{
}
+ blob(const blob &rhs) noexcept = default;
+
+ blob &operator=(const blob &rhs) noexcept
+ {
+ if (this == &rhs) {
+ return *this;
+ }
+
+ _holder = rhs._holder;
+ _buffer = rhs._buffer;
+ _data = rhs._data;
+ _length = rhs._length;
+
+ return *this;
+ }
+
+ blob(blob &&rhs) noexcept
+ : _holder(std::move(rhs._holder)),
+ _buffer(rhs._buffer),
+ _data(rhs._data),
+ _length(rhs._length)
+ {
+ rhs._buffer = nullptr;
+ rhs._data = nullptr;
+ rhs._length = 0;
+ }
+
+ blob &operator=(blob &&rhs) noexcept
+ {
+ if (this == &rhs) {
+ return *this;
+ }
+
+ _holder = std::move(rhs._holder);
+ _buffer = rhs._buffer;
+ _data = rhs._data;
+ _length = rhs._length;
+
+ rhs._buffer = nullptr;
+ rhs._data = nullptr;
+ rhs._length = 0;
+
+ return *this;
+ }
+
/// Create shared buffer from allocated raw bytes.
/// NOTE: this operation is not efficient since it involves a memory copy.
- static blob create_from_bytes(const char *s, size_t len)
+ [[nodiscard]] static blob create_from_bytes(const char *s, size_t len)
{
+ CHECK_NOTNULL(s, "null source pointer would lead to undefined
behaviour");
+
std::shared_ptr<char> s_arr(new char[len],
std::default_delete<char[]>());
memcpy(s_arr.get(), s, len);
- return blob(std::move(s_arr), 0, static_cast<unsigned int>(len));
+ return {std::move(s_arr), static_cast<unsigned int>(len)};
}
/// Create shared buffer without copying data.
- static blob create_from_bytes(std::string &&bytes)
+ [[nodiscard]] static blob create_from_bytes(std::string &&bytes)
{
- auto s = new std::string(std::move(bytes));
+ auto *s = new std::string(std::move(bytes));
std::shared_ptr<char> buf(const_cast<char *>(s->data()), [s](char *) {
delete s; });
- return blob(std::move(buf), 0, static_cast<unsigned int>(s->length()));
+ return {std::move(buf), static_cast<unsigned int>(s->length())};
}
void assign(const std::shared_ptr<char> &buffer, int offset, unsigned int
length)
@@ -95,8 +135,8 @@ public:
void assign(std::shared_ptr<char> &&buffer, int offset, unsigned int
length)
{
_holder = std::move(buffer);
- _buffer = (_holder.get());
- _data = (_holder.get() + offset);
+ _buffer = _holder.get();
+ _data = _holder.get() + offset;
_length = length;
}
@@ -109,21 +149,22 @@ public:
_length = length;
}
- const char *data() const noexcept { return _data; }
+ [[nodiscard]] const char *data() const noexcept { return _data; }
- unsigned int length() const noexcept { return _length; }
- unsigned int size() const noexcept { return _length; }
- bool empty() const noexcept { return _length == 0; }
+ [[nodiscard]] unsigned int length() const noexcept { return _length; }
+ [[nodiscard]] unsigned int size() const noexcept { return _length; }
+ [[nodiscard]] bool empty() const noexcept { return _length == 0; }
- std::shared_ptr<char> buffer() const { return _holder; }
+ [[nodiscard]] std::shared_ptr<char> buffer() const { return _holder; }
- const char *buffer_ptr() const { return _holder.get(); }
+ [[nodiscard]] const char *buffer_ptr() const { return _holder.get(); }
- // offset can be negative for buffer dereference
- blob range(int offset) const
+ // `offset` can be negative for buffer dereference.
+ [[nodiscard]] blob range(int offset) const
{
- // offset cannot exceed the current length value
- assert(offset <= static_cast<int>(_length));
+ DCHECK_LE_MSG(offset,
+ static_cast<int>(_length),
+ "the required offset cannot exceed the current length");
blob temp = *this;
temp._data += offset;
@@ -131,33 +172,38 @@ public:
return temp;
}
- blob range(int offset, unsigned int len) const
+ [[nodiscard]] blob range(int offset, unsigned int len) const
{
- // offset cannot exceed the current length value
- assert(offset <= static_cast<int>(_length));
+ DCHECK_LE_MSG(offset,
+ static_cast<int>(_length),
+ "the required offset cannot exceed the current length");
blob temp = *this;
temp._data += offset;
temp._length -= offset;
- // buffer length must exceed the required length
- assert(temp._length >= len);
+ DCHECK_LE_MSG(
+ len, temp._length, "the required length cannot exceed remaining
buffer length");
+
temp._length = len;
return temp;
}
- bool operator==(const blob &r) const
+ // Could NOT be declared with "= delete", since many thrift-generated
classes would
+ // access this in their own `operator==`.
+ bool operator==(const blob &) const
{
- // not implemented
- assert(false);
+ CHECK(false, "not implemented");
return false;
}
- std::string to_string() const
+ [[nodiscard]] std::string to_string() const
{
- if (_length == 0)
+ if (_length == 0) {
return {};
- return std::string(_data, _length);
+ }
+
+ return {_data, _length};
}
friend std::ostream &operator<<(std::ostream &os, const blob &bb)
@@ -165,7 +211,7 @@ public:
return os << bb.to_string();
}
- std::string_view to_string_view() const { return std::string_view(_data,
_length); }
+ [[nodiscard]] std::string_view to_string_view() const { return {_data,
_length}; }
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
diff --git a/src/utils/test/blob_test.cpp b/src/utils/test/blob_test.cpp
new file mode 100644
index 000000000..8ae2c8b7f
--- /dev/null
+++ b/src/utils/test/blob_test.cpp
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "utils/blob.h"
+
+namespace dsn {
+
+struct blob_base_case
+{
+ std::string expected_str;
+};
+
+class BlobBaseTest : public testing::TestWithParam<blob_base_case>
+{
+public:
+ void SetUp() override
+ {
+ const auto &test_case = GetParam();
+ _expected_str = test_case.expected_str;
+ }
+
+ void check_blob_value(const blob &obj) const
+ {
+ EXPECT_EQ(_expected_str, obj.to_string());
+
+ EXPECT_EQ(_expected_str.size(), obj.length());
+ EXPECT_EQ(_expected_str.size(), obj.size());
+
+ if (_expected_str.empty()) {
+ EXPECT_TRUE(obj.empty());
+ } else {
+ EXPECT_FALSE(obj.empty());
+ }
+ }
+
+protected:
+ std::string _expected_str;
+};
+
+const std::vector<blob_base_case> blob_base_tests = {
+ // Test empty case.
+ {""},
+ // Test non-empty case.
+ {"hello"},
+};
+
+class BlobCreateTest : public BlobBaseTest
+{
+};
+
+TEST_P(BlobCreateTest, CreateFromCString)
+{
+ const auto &obj = blob::create_from_bytes(_expected_str.data(),
_expected_str.size());
+ check_blob_value(obj);
+}
+
+TEST_P(BlobCreateTest, CreateFromString)
+{
+ const auto &obj = blob::create_from_bytes(std::string(_expected_str));
+ check_blob_value(obj);
+}
+
+INSTANTIATE_TEST_SUITE_P(BlobTest, BlobCreateTest,
testing::ValuesIn(blob_base_tests));
+
+class BlobInitTest : public BlobBaseTest
+{
+public:
+ blob create() { return
blob::create_from_bytes(std::string(_expected_str)); }
+};
+
+TEST_P(BlobInitTest, CopyConstructor)
+{
+ const auto &obj = create();
+
+ blob copy(obj);
+ check_blob_value(copy);
+}
+
+TEST_P(BlobInitTest, CopyAssignment)
+{
+ const auto &obj = create();
+
+ blob copy;
+ copy = obj;
+ check_blob_value(copy);
+}
+
+TEST_P(BlobInitTest, MoveConstructor)
+{
+ auto obj = create();
+
+ blob move(std::move(obj));
+ check_blob_value(move);
+}
+
+TEST_P(BlobInitTest, MoveAssignment)
+{
+ auto obj = create();
+
+ blob move;
+ move = std::move(obj);
+ check_blob_value(move);
+}
+
+INSTANTIATE_TEST_SUITE_P(BlobTest, BlobInitTest,
testing::ValuesIn(blob_base_tests));
+
+} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]