This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new a774afea1 fix(duplication): reduce the delay before last mutation is
duplicated to the remote cluster (#2048)
a774afea1 is described below
commit a774afea1fb0b860be583a2858f67d9fbbf96c2a
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jun 27 16:57:49 2024 +0800
fix(duplication): reduce the delay before last mutation is duplicated to
the remote cluster (#2048)
https://github.com/apache/incubator-pegasus/issues/2050
As is described by the issue, the problem is that we have to waits 2 ~ 3
minutes
(until some empty write gets in) before the last mutation is duplicated to
the remote
cluster.
The reason is that the last committed decree of the last mutation (i.e.
`mutation.data.header.last_committed_decree`), rather than the decree of the
last mutation (i.e. `mutation.data.header.decree`), is chosen as the max
decree
that is duplicated to the remote cluster. Instead, the max committed decree
should
be chosen as the max decree that is duplicated to the remote cluster.
After the optimization, the delay has been reduced from 2 ~ 3 minutes to
about
0.1 seconds.
---
src/common/consensus.thrift | 13 ++
src/replica/duplication/duplication_pipeline.cpp | 7 +-
src/replica/duplication/mutation_batch.cpp | 38 +++--
src/replica/duplication/mutation_batch.h | 10 +-
src/replica/duplication/replica_duplicator.h | 1 +
.../duplication/test/duplication_test_base.h | 11 +-
.../duplication/test/mutation_batch_test.cpp | 114 ++++++++++----
src/replica/mutation_log.cpp | 152 ++++++++++--------
src/replica/mutation_log.h | 169 ++++++++++++---------
src/replica/replica.h | 1 +
src/replica/test/mutation_log_test.cpp | 13 +-
src/replica/test/replica_test_base.h | 12 +-
12 files changed, 357 insertions(+), 184 deletions(-)
diff --git a/src/common/consensus.thrift b/src/common/consensus.thrift
index 26312b8e3..8952c090c 100644
--- a/src/common/consensus.thrift
+++ b/src/common/consensus.thrift
@@ -32,11 +32,24 @@ namespace cpp dsn.replication
struct mutation_header
{
+ // The partition that this mutation belongs to.
1:dsn.gpid pid;
+
+ // The ID of the membership configuration that this mutation belongs to,
+ // increasing monotonically.
2:i64 ballot;
+
+ // The decree of this mutation.
3:i64 decree;
+
+ // The start offset of this mutation in the whole mutation log.
4:i64 log_offset;
+
+ // The max of the decrees that have been committed before this mutation
+ // is prepared.
5:i64 last_committed_decree;
+
+ // The unique timestamp that increases monotonically in microsecond.
6:i64 timestamp;
}
diff --git a/src/replica/duplication/duplication_pipeline.cpp
b/src/replica/duplication/duplication_pipeline.cpp
index 54abd83cf..53e54ece3 100644
--- a/src/replica/duplication/duplication_pipeline.cpp
+++ b/src/replica/duplication/duplication_pipeline.cpp
@@ -19,6 +19,7 @@
#include <absl/strings/string_view.h>
#include <stddef.h>
+#include <algorithm>
#include <functional>
#include <string>
#include <utility>
@@ -57,7 +58,11 @@ void load_mutation::run()
{
decree last_decree = _duplicator->progress().last_decree;
_start_decree = last_decree + 1;
- if (_replica->private_log()->max_commit_on_disk() < _start_decree) {
+
+ // Load the mutations from plog that have been committed recently, if any.
+ const auto max_plog_committed_decree =
+ std::min(_replica->private_log()->max_decree_on_disk(),
_replica->last_applied_decree());
+ if (_start_decree > max_plog_committed_decree) {
// wait 100ms for next try if no mutation was added.
repeat(100_ms);
return;
diff --git a/src/replica/duplication/mutation_batch.cpp
b/src/replica/duplication/mutation_batch.cpp
index e6d91d5f2..bd2c8bf46 100644
--- a/src/replica/duplication/mutation_batch.cpp
+++ b/src/replica/duplication/mutation_batch.cpp
@@ -27,6 +27,7 @@
#include "metadata_types.h"
#include "mutation_batch.h"
#include "replica_duplicator.h"
+#include "replica/replica.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
#include "utils/autoref_ptr.h"
@@ -55,8 +56,10 @@ mutation_buffer::mutation_buffer(replica_base *r,
void mutation_buffer::commit(decree d, commit_type ct)
{
- if (d <= last_committed_decree())
+ if (d <= last_committed_decree()) {
+ // Ignore the decrees that have been committed.
return;
+ }
CHECK_EQ_PREFIX(ct, COMMIT_TO_DECREE_HARD);
@@ -85,8 +88,8 @@ void mutation_buffer::commit(decree d, commit_type ct)
min_decree(),
max_decree());
METRIC_VAR_SET(dup_recent_lost_mutations, min_decree() -
last_committed_decree());
- // if next_commit_mutation loss, let last_commit_decree catch up
with min_decree, and
- // the next loop will commit from min_decree
+ // If next_commit_mutation loss, let last_commit_decree catch up
with min_decree, and
+ // the next loop will commit from min_decree.
_last_committed_decree = min_decree() - 1;
return;
}
@@ -101,13 +104,13 @@ void mutation_buffer::commit(decree d, commit_type ct)
error_s mutation_batch::add(mutation_ptr mu)
{
if (mu->get_decree() <= _mutation_buffer->last_committed_decree()) {
- // ignore
+ // Ignore the mutations that have been committed.
return error_s::ok();
}
auto old = _mutation_buffer->get_mutation_by_decree(mu->get_decree());
if (old != nullptr && old->data.header.ballot >= mu->data.header.ballot) {
- // ignore
+ // The mutation with duplicate decree would be ignored.
return error_s::ok();
}
@@ -123,6 +126,16 @@ error_s mutation_batch::add(mutation_ptr mu)
_start_decree);
}
+ if (mu->get_decree() <= _replica->last_applied_decree()) {
+ // Once this mutation has been applied into rocksdb memtable, commit
it for duplication;
+ // otherwise, this mutation would be delayed at least several minutes
to be duplicated to
+ // the remote cluster. It would not be duplicated until some new
mutations (such as empty
+ // writes) enter, since the last decree that is committed for this
replica is NOT
+ // mu->data.header.decree but rather
mu->data.header.last_committed_decree. See also
+ // `mutation_header` in src/common/consensus.thrift.
+ _mutation_buffer->commit(mu->get_decree(), COMMIT_TO_DECREE_HARD);
+ }
+
return error_s::ok();
}
@@ -140,7 +153,7 @@ mutation_tuple_set mutation_batch::move_all_mutations()
return std::move(_loaded_mutations);
}
-mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r)
+mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r),
_replica(r->_replica)
{
// Prepend a special tag identifying this is a mutation_batch,
// so `dxxx_replica` logging in prepare_list will print along with its
real caller.
@@ -149,25 +162,29 @@ mutation_batch::mutation_batch(replica_duplicator *r) :
replica_base(r)
r->get_gpid(), std::string("mutation_batch@") + r->replica_name(),
r->app_name());
_mutation_buffer = std::make_unique<mutation_buffer>(
&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) {
- // committer
+ // The committer for the prepare list, used for duplicating to add
the committed
+ // mutations to the loading list, which would be shipped to the
remote cluster
+ // later.
add_mutation_if_valid(mu, _start_decree);
});
- // start duplication from confirmed_decree
+ // Start duplication from the confirmed decree that has been persisted in
the meta server.
_mutation_buffer->reset(r->progress().confirmed_decree);
}
void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree
start_decree)
{
if (mu->get_decree() < start_decree) {
- // ignore
+ // Ignore the mutations before start_decree.
return;
}
+
for (mutation_update &update : mu->data.updates) {
- // ignore WRITE_EMPTY
if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
+ // Ignore empty writes.
continue;
}
+
// Ignore non-idempotent writes.
// Normally a duplicating replica will reply non-idempotent writes with
// ERR_OPERATION_DISABLED, but there could still be a mutation written
@@ -176,6 +193,7 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr
&mu, decree start_decree
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) {
continue;
}
+
blob bb;
if (update.data.buffer() != nullptr) {
bb = std::move(update.data);
diff --git a/src/replica/duplication/mutation_batch.h
b/src/replica/duplication/mutation_batch.h
index 97795cea2..0cca5169e 100644
--- a/src/replica/duplication/mutation_batch.h
+++ b/src/replica/duplication/mutation_batch.h
@@ -31,7 +31,7 @@
namespace dsn {
namespace replication {
-
+class replica;
class replica_duplicator;
class mutation_buffer : public prepare_list
@@ -57,15 +57,19 @@ public:
explicit mutation_batch(replica_duplicator *r);
+ // Add mutations to prepare list. Only those who have been committed would
be
+ // duplicated to the remote cluster.
error_s add(mutation_ptr mu);
+ // Add the committed mutation to the loading list, which would be shipped
to
+ // the remote cluster later.
void add_mutation_if_valid(mutation_ptr &, decree start_decree);
mutation_tuple_set move_all_mutations();
decree last_decree() const;
- // mutations with decree < d will be ignored.
+ // Mutations with decree < d will be ignored.
void set_start_decree(decree d);
void reset_mutation_buffer(decree d);
@@ -78,6 +82,8 @@ private:
friend class replica_duplicator_test;
friend class mutation_batch_test;
+ replica *_replica;
+
std::unique_ptr<prepare_list> _mutation_buffer;
mutation_tuple_set _loaded_mutations;
decree _start_decree{invalid_decree};
diff --git a/src/replica/duplication/replica_duplicator.h
b/src/replica/duplication/replica_duplicator.h
index e9df7d7cb..66f7ac7ce 100644
--- a/src/replica/duplication/replica_duplicator.h
+++ b/src/replica/duplication/replica_duplicator.h
@@ -170,6 +170,7 @@ private:
friend class load_from_private_log_test;
friend class ship_mutation_test;
+ friend class mutation_batch;
friend class load_mutation;
friend class ship_mutation;
diff --git a/src/replica/duplication/test/duplication_test_base.h
b/src/replica/duplication/test/duplication_test_base.h
index eb914f38e..69d935cc1 100644
--- a/src/replica/duplication/test/duplication_test_base.h
+++ b/src/replica/duplication/test/duplication_test_base.h
@@ -76,13 +76,20 @@ public:
return log_file_map;
}
- mutation_ptr create_test_mutation(int64_t decree, const std::string &data)
override
+ mutation_ptr create_test_mutation(int64_t decree,
+ int64_t last_committed_decree,
+ const std::string &data) override
{
- auto mut = replica_test_base::create_test_mutation(decree, data);
+ auto mut = replica_test_base::create_test_mutation(decree,
last_committed_decree, data);
mut->data.updates[0].code = RPC_DUPLICATION_IDEMPOTENT_WRITE; // must
be idempotent write
return mut;
}
+ mutation_ptr create_test_mutation(int64_t decree, const std::string &data)
override
+ {
+ return duplication_test_base::create_test_mutation(decree, decree - 1,
data);
+ }
+
void wait_all(const std::unique_ptr<replica_duplicator> &dup)
{
dup->tracker()->wait_outstanding_tasks();
diff --git a/src/replica/duplication/test/mutation_batch_test.cpp
b/src/replica/duplication/test/mutation_batch_test.cpp
index 541531c5e..dd5c27738 100644
--- a/src/replica/duplication/test/mutation_batch_test.cpp
+++ b/src/replica/duplication/test/mutation_batch_test.cpp
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include <algorithm>
#include <atomic>
-#include <map>
+#include <iterator>
#include <memory>
+#include <set>
#include <string>
#include <tuple>
#include <utility>
@@ -40,50 +42,102 @@ namespace replication {
class mutation_batch_test : public duplication_test_base
{
public:
- void
- reset_buffer(const mutation_batch &batcher, const decree last_commit,
decree start, decree end)
+ void reset_buffer(const decree last_commit,
+ const decree start,
+ const decree end,
+ mutation_batch &batcher)
{
batcher._mutation_buffer->reset(last_commit);
batcher._mutation_buffer->_start_decree = start;
batcher._mutation_buffer->_end_decree = end;
}
- void commit_buffer(const mutation_batch &batcher, const decree
current_decree)
+ void commit_buffer(const decree current_decree, mutation_batch &batcher)
{
batcher._mutation_buffer->commit(current_decree,
COMMIT_TO_DECREE_HARD);
}
+
+ void check_mutation_contents(const std::set<std::string>
&expected_mutations,
+ mutation_batch &batcher)
+ {
+ const auto all_mutations = batcher.move_all_mutations();
+
+ std::set<std::string> actual_mutations;
+ std::transform(all_mutations.begin(),
+ all_mutations.end(),
+ std::inserter(actual_mutations, actual_mutations.end()),
+ [](const mutation_tuple &tuple) { return
std::get<2>(tuple).to_string(); });
+
+ ASSERT_EQ(expected_mutations, actual_mutations);
+ }
};
INSTANTIATE_TEST_SUITE_P(, mutation_batch_test, ::testing::Values(false,
true));
-TEST_P(mutation_batch_test, add_mutation_if_valid)
+TEST_P(mutation_batch_test, prepare_mutation)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());
- mutation_tuple_set result;
+ auto mu1 = create_test_mutation(1, 0, "first mutation");
+ set_last_applied_decree(1);
+ ASSERT_TRUE(batcher.add(mu1));
+ ASSERT_EQ(1, batcher.last_decree());
+
+ auto mu2 = create_test_mutation(2, 1, "abcde");
+ set_last_applied_decree(2);
+ ASSERT_TRUE(batcher.add(mu2));
+ ASSERT_EQ(2, batcher.last_decree());
+
+ auto mu3 = create_test_mutation(3, 2, "hello world");
+ ASSERT_TRUE(batcher.add(mu3));
+
+ // The last decree has not been updated.
+ ASSERT_EQ(2, batcher.last_decree());
+
+ auto mu4 = create_test_mutation(4, 2, "foo bar");
+ ASSERT_TRUE(batcher.add(mu4));
+ ASSERT_EQ(2, batcher.last_decree());
+
+ // The committed mutation would be ignored.
+ auto mu2_another = create_test_mutation(2, 1, "another second mutation");
+ ASSERT_TRUE(batcher.add(mu2_another));
+ ASSERT_EQ(2, batcher.last_decree());
+
+ // The mutation with duplicate decree would be ignored.
+ auto mu3_another = create_test_mutation(3, 2, "123 xyz");
+ ASSERT_TRUE(batcher.add(mu3_another));
+ ASSERT_EQ(2, batcher.last_decree());
- std::string s = "hello";
- mutation_ptr mu1 = create_test_mutation(1, s);
+ auto mu5 = create_test_mutation(5, 2, "5th mutation");
+ set_last_applied_decree(5);
+ ASSERT_TRUE(batcher.add(mu5));
+ ASSERT_EQ(5, batcher.last_decree());
+
+ check_mutation_contents({"first mutation", "abcde", "hello world", "foo
bar", "5th mutation"},
+ batcher);
+}
+
+TEST_P(mutation_batch_test, add_mutation_if_valid)
+{
+ auto duplicator = create_test_duplicator(0);
+ mutation_batch batcher(duplicator.get());
+
+ auto mu1 = create_test_mutation(1, "hello");
batcher.add_mutation_if_valid(mu1, 0);
- result = batcher.move_all_mutations();
- mutation_tuple mt1 = *result.begin();
+ check_mutation_contents({"hello"}, batcher);
- s = "world";
- mutation_ptr mu2 = create_test_mutation(2, s);
+ auto mu2 = create_test_mutation(2, "world");
batcher.add_mutation_if_valid(mu2, 0);
- result = batcher.move_all_mutations();
- mutation_tuple mt2 = *result.begin();
-
- ASSERT_EQ(std::get<2>(mt1).to_string(), "hello");
- ASSERT_EQ(std::get<2>(mt2).to_string(), "world");
+ check_mutation_contents({"world"}, batcher);
- // decree 1 should be ignored
- mutation_ptr mu3 = create_test_mutation(1, s);
+ // mu1 would be ignored, since its decree is less than the start decree.
+ batcher.add_mutation_if_valid(mu1, 2);
batcher.add_mutation_if_valid(mu2, 2);
+
+ auto mu3 = create_test_mutation(1, "hi");
batcher.add_mutation_if_valid(mu3, 1);
- result = batcher.move_all_mutations();
- ASSERT_EQ(result.size(), 2);
+ check_mutation_contents({"hi", "world"}, batcher);
}
TEST_P(mutation_batch_test, ignore_non_idempotent_write)
@@ -91,23 +145,23 @@ TEST_P(mutation_batch_test, ignore_non_idempotent_write)
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());
- std::string s = "hello";
- mutation_ptr mu = create_test_mutation(1, s);
+ auto mu = create_test_mutation(1, "hello");
mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
batcher.add_mutation_if_valid(mu, 0);
- mutation_tuple_set result = batcher.move_all_mutations();
- ASSERT_EQ(result.size(), 0);
+ check_mutation_contents({}, batcher);
}
TEST_P(mutation_batch_test, mutation_buffer_commit)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());
- // mock mutation_buffer[last=10, start=15, end=20], last + 1(next commit
decree) is out of
- // [start~end]
- reset_buffer(batcher, 10, 15, 20);
- commit_buffer(batcher, 15);
- ASSERT_EQ(batcher.last_decree(), 14);
+
+ // Mock mutation_buffer[last=10, start=15, end=20], last + 1(next commit
decree) is out of
+ // [start~end], then last would become min_decree() - 1, see
mutation_buffer::commit() for
+ // details.
+ reset_buffer(10, 15, 20, batcher);
+ commit_buffer(15, batcher);
+ ASSERT_EQ(14, batcher.last_decree());
}
} // namespace replication
diff --git a/src/replica/mutation_log.cpp b/src/replica/mutation_log.cpp
index 84a1e5be3..20592fc47 100644
--- a/src/replica/mutation_log.cpp
+++ b/src/replica/mutation_log.cpp
@@ -243,6 +243,7 @@ void mutation_log_private::write_pending_mutations(bool
release_lock_required)
// move or reset pending variables
std::shared_ptr<log_appender> pending = std::move(_pending_write);
_issued_write = pending;
+ decree max_decree = _pending_write_max_decree;
decree max_commit = _pending_write_max_commit;
_pending_write_max_commit = 0;
_pending_write_max_decree = 0;
@@ -250,11 +251,12 @@ void mutation_log_private::write_pending_mutations(bool
release_lock_required)
// Free plog from lock during committing log block, in the meantime
// new mutations can still be appended.
_plock.unlock();
- commit_pending_mutations(pr.first, pending, max_commit);
+ commit_pending_mutations(pr.first, pending, max_decree, max_commit);
}
void mutation_log_private::commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_appender> &pending,
+ decree max_decree,
decree max_commit)
{
if (dsn_unlikely(FLAGS_enable_latency_tracer)) {
@@ -263,64 +265,66 @@ void
mutation_log_private::commit_pending_mutations(log_file_ptr &lf,
}
}
- lf->commit_log_blocks(*pending,
- LPC_WRITE_REPLICATION_LOG_PRIVATE,
- &_tracker,
- [this, lf, pending, max_commit](error_code err,
size_t sz) mutable {
-
CHECK(_is_writing.load(std::memory_order_relaxed), "");
-
- for (auto &block : pending->all_blocks()) {
- auto hdr = (log_block_header
*)block.front().data();
- CHECK_EQ(hdr->magic, 0xdeadbeef);
- }
-
- if (dsn_unlikely(FLAGS_enable_latency_tracer)) {
- for (const auto &mu : pending->mutations()) {
- ADD_CUSTOM_POINT(mu->_tracer,
"commit_pending_completed");
- }
- }
-
- // notify the callbacks
- // ATTENTION: callback may be called before this
code block executed
- // done.
- for (auto &c : pending->callbacks()) {
- c->enqueue(err, sz);
- }
-
- if (err != ERR_OK) {
- LOG_ERROR("write private log failed, err =
{}", err);
- _is_writing.store(false,
std::memory_order_relaxed);
- if (_io_error_callback) {
- _io_error_callback(err);
- }
- return;
- }
- CHECK_EQ(sz, pending->size());
-
- // flush to ensure that there is no gap between
private log and
- // in-memory buffer
- // so that we can get all mutations in learning
process.
- //
- // FIXME : the file could have been closed
- if (FLAGS_plog_force_flush) {
- lf->flush();
- }
-
- // update _private_max_commit_on_disk after
written into log file done
- update_max_commit_on_disk(max_commit);
-
- _is_writing.store(false,
std::memory_order_relaxed);
-
- // start to write if possible
- _plock.lock();
-
- if (!_is_writing.load(std::memory_order_acquire)
&& _pending_write) {
- write_pending_mutations(true);
- } else {
- _plock.unlock();
- }
- },
- get_gpid().thread_hash());
+ lf->commit_log_blocks(
+ *pending,
+ LPC_WRITE_REPLICATION_LOG_PRIVATE,
+ &_tracker,
+ [this, lf, pending, max_decree, max_commit](error_code err, size_t sz)
mutable {
+ CHECK(_is_writing.load(std::memory_order_relaxed), "");
+
+ for (auto &block : pending->all_blocks()) {
+ auto hdr = (log_block_header *)block.front().data();
+ CHECK_EQ(hdr->magic, 0xdeadbeef);
+ }
+
+ if (dsn_unlikely(FLAGS_enable_latency_tracer)) {
+ for (const auto &mu : pending->mutations()) {
+ ADD_CUSTOM_POINT(mu->_tracer, "commit_pending_completed");
+ }
+ }
+
+ // notify the callbacks
+ // ATTENTION: callback may be called before this code block
executed
+ // done.
+ for (auto &c : pending->callbacks()) {
+ c->enqueue(err, sz);
+ }
+
+ if (err != ERR_OK) {
+ LOG_ERROR("write private log failed, err = {}", err);
+ _is_writing.store(false, std::memory_order_relaxed);
+ if (_io_error_callback) {
+ _io_error_callback(err);
+ }
+ return;
+ }
+ CHECK_EQ(sz, pending->size());
+
+ // flush to ensure that there is no gap between private log and
+ // in-memory buffer
+ // so that we can get all mutations in learning process.
+ //
+ // FIXME : the file could have been closed
+ if (FLAGS_plog_force_flush) {
+ lf->flush();
+ }
+
+ // Update both _plog_max_decree_on_disk and
_plog_max_commit_on_disk
+ // after written into log file done.
+ update_max_decree_on_disk(max_decree, max_commit);
+
+ _is_writing.store(false, std::memory_order_relaxed);
+
+ // start to write if possible
+ _plock.lock();
+
+ if (!_is_writing.load(std::memory_order_acquire) &&
_pending_write) {
+ write_pending_mutations(true);
+ } else {
+ _plock.unlock();
+ }
+ },
+ get_gpid().thread_hash());
}
///////////////////////////////////////////////////////////////
@@ -355,7 +359,8 @@ void mutation_log::init_states()
// replica states
_private_log_info = {0, 0};
- _private_max_commit_on_disk = 0;
+ _plog_max_decree_on_disk = 0;
+ _plog_max_commit_on_disk = 0;
}
error_code mutation_log::open(replay_callback read_callback,
@@ -522,6 +527,7 @@ error_code mutation_log::open(replay_callback read_callback,
if (ret) {
this->update_max_decree_no_lock(mu->data.header.pid,
mu->data.header.decree);
if (this->_is_private) {
+
this->update_max_decree_on_disk_no_lock(mu->data.header.decree);
this->update_max_commit_on_disk_no_lock(mu->data.header.last_committed_decree);
}
}
@@ -702,11 +708,18 @@ decree mutation_log::max_decree(gpid gpid) const
return _private_log_info.max_decree;
}
+decree mutation_log::max_decree_on_disk() const
+{
+ zauto_lock l(_lock);
+ CHECK(_is_private, "this method is only valid for private logs");
+ return _plog_max_decree_on_disk;
+}
+
decree mutation_log::max_commit_on_disk() const
{
zauto_lock l(_lock);
CHECK(_is_private, "this method is only valid for private logs");
- return _private_max_commit_on_disk;
+ return _plog_max_commit_on_disk;
}
decree mutation_log::max_gced_decree(gpid gpid) const
@@ -862,17 +875,26 @@ void mutation_log::update_max_decree_no_lock(gpid gpid,
decree d)
}
}
-void mutation_log::update_max_commit_on_disk(decree d)
+void mutation_log::update_max_decree_on_disk(decree max_decree, decree
max_commit)
{
zauto_lock l(_lock);
- update_max_commit_on_disk_no_lock(d);
+ update_max_decree_on_disk_no_lock(max_decree);
+ update_max_commit_on_disk_no_lock(max_commit);
+}
+
+void mutation_log::update_max_decree_on_disk_no_lock(decree d)
+{
+ CHECK(_is_private, "this method is only valid for private logs");
+ if (d > _plog_max_decree_on_disk) {
+ _plog_max_decree_on_disk = d;
+ }
}
void mutation_log::update_max_commit_on_disk_no_lock(decree d)
{
CHECK(_is_private, "this method is only valid for private logs");
- if (d > _private_max_commit_on_disk) {
- _private_max_commit_on_disk = d;
+ if (d > _plog_max_commit_on_disk) {
+ _plog_max_commit_on_disk = d;
}
}
diff --git a/src/replica/mutation_log.h b/src/replica/mutation_log.h
index c4ce671ec..8de9e23bf 100644
--- a/src/replica/mutation_log.h
+++ b/src/replica/mutation_log.h
@@ -76,9 +76,10 @@ public:
typedef std::function<void(dsn::error_code err)> io_failure_callback;
public:
- // append a log mutation
- // return value: nullptr for error
- // thread safe
+ // Append a log mutation.
+ // Return value: nullptr for error.
+ //
+ // Thread safe.
virtual ::dsn::task_ptr append(mutation_ptr &mu,
dsn::task_code callback_code,
dsn::task_tracker *tracker,
@@ -86,34 +87,37 @@ public:
int hash = 0,
int64_t *pending_size = nullptr) = 0;
- // get learn state in memory, including pending and writing mutations
- // return true if some data is filled into writer
- // return false if no data is filled into writer
- // thread safe
+ // Get learn state in memory, including pending and writing mutations:
+ // - return true if some data is filled into writer
+ // - return false if no data is filled into writer
+ //
+ // Thread safe
virtual bool get_learn_state_in_memory(decree start_decree, binary_writer
&writer) const
{
return false;
}
- // only for private log
- // get in-memory mutations, including pending and writing mutations
+ // Only for private log.
+ // get in-memory mutations, including pending and writing mutations.
virtual void get_in_memory_mutations(decree start_decree,
ballot current_ballot,
/*out*/ std::vector<mutation_ptr>
&mutations_list) const
{
}
- // flush the pending buffer until all data is on disk
- // thread safe
+ // Flush the pending buffer until all data is on disk.
+ //
+ // Thread safe.
virtual void flush() = 0;
- // flush the pending buffer at most once
- // thread safe
+ // Flush the pending buffer at most once.
+ //
+ // Thread safe.
virtual void flush_once() = 0;
public:
//
- // ctors
+ // Ctors
// when is_private = true, should specify "private_gpid"
//
mutation_log(const std::string &dir, int32_t max_log_file_mb, gpid gpid,
replica *r = nullptr);
@@ -121,22 +125,24 @@ public:
virtual ~mutation_log() = default;
//
- // initialization
+ // Initialization
//
- // open and replay
- // returns ERR_OK if succeed
- // not thread safe, but only be called when init
+ // Open and replay.
+ // return ERR_OK if succeed.
+ // Not thread safe, but only be called when init.
error_code open(replay_callback read_callback, io_failure_callback
write_error_callback);
error_code open(replay_callback read_callback,
io_failure_callback write_error_callback,
const std::map<gpid, decree> &replay_condition);
- // close the log
- // thread safe
+
+ // Close the log.
+ //
+ // Thread safe.
void close();
//
- // replay
+ // Replay.
//
static error_code replay(std::vector<std::string> &log_files,
replay_callback callback,
@@ -173,55 +179,61 @@ public:
error_code reset_from(const std::string &dir, replay_callback,
io_failure_callback);
//
- // maintain max_decree & valid_start_offset
+ // Maintain max_decree & valid_start_offset
//
- // when open a exist replica, need to set valid_start_offset on open
- // thread safe
+ // valid_start_offset is needed to be set while opening an existing
replica.
+ //
+ // Thread safe.
void set_valid_start_offset_on_open(gpid gpid, int64_t valid_start_offset);
- // when create a new replica, need to reset current max decree
- // returns current global end offset, needs to be remebered by caller for
gc usage
- // thread safe
+ // Current max decree is needed to be reset, while creating a new replica.
+ // Return current global end offset, should be remebered by caller for gc
usage.
+ //
+ // Thread safe.
int64_t on_partition_reset(gpid gpid, decree max_decree);
- // update current max decree
- // thread safe
+ // Update current max decree.
+ //
+ // Thread safe.
void update_max_decree(gpid gpid, decree d);
- // update current max commit of private log
- // thread safe
- void update_max_commit_on_disk(decree d);
+ // Update current max decree and committed decree that have ever been
written onto disk
+ // for plog.
+ //
+ // Thread safe.
+ void update_max_decree_on_disk(decree max_decree, decree max_commit);
//
- // garbage collection logs that are already covered by
+ // Garbage collection logs that are already covered by
// durable state on disk, return deleted log segment count
//
- // garbage collection for private log, returns removed file count.
- // can remove log files if satisfy all the conditions:
+ // Garbage collection for private log, returns removed file count.
+ //
+ // Log files could be removed once all the following conditions are
satisfied:
// - the file is not the current log file
// - the file is not covered by reserve_max_size or reserve_max_time
// - file.max_decree <= "durable_decree" || file.end_offset <=
"valid_start_offset"
- // that means, should reserve files if satisfy one of the conditions:
+ // which means, files should be reserved if one of the conditions is
satisfied:
// - the file is the current log file
// - the file is covered by both reserve_max_size and reserve_max_time
// - file.max_decree > "durable_decree" && file.end_offset >
"valid_start_offset"
- // thread safe
+ //
+ // Thread safe.
int garbage_collection(gpid gpid,
decree durable_decree,
int64_t valid_start_offset,
int64_t reserve_max_size,
int64_t reserve_max_time);
- //
- // when this is a private log, log files are learned by remote replicas
- // return true if private log surely covers the learning range
- //
+ // When this is a private log, log files are learned by remote replicas
+ // return true if private log surely covers the learning range.
bool get_learn_state(gpid gpid, decree start, /*out*/ learn_state &state)
const;
- // only valid for private log.
- // get parent mutations in memory and private log files during partition
split.
+ // Only valid for private log.
+ //
+ // Get parent mutations in memory and private log files during partition
split.
// `total_file_size` is used for the metrics of partition split.
void get_parent_mutations_and_logs(gpid pid,
decree start_decree,
@@ -231,23 +243,28 @@ public:
/*out*/ uint64_t &total_file_size)
const;
//
- // other inquiry routines
+ // Other inquiry routines
//
- // log dir
- // thread safe (because nerver changed)
+ // Get log dir.
+ //
+ // Thread safe (because nerver changed).
const std::string &dir() const { return _dir; }
- // replica
- replica *owner_replica() const { return _owner_replica; }
-
- // get current max decree for gpid
- // returns 0 if not found
- // thread safe
+ // Get current max decree for gpid.
+ // Return 0 if not found.
+ //
+ // Thread safe.
decree max_decree(gpid gpid) const;
- // get current max commit on disk of private log.
- // thread safe
+ // Get current max decree on disk for plog.
+ //
+ // Thread safe.
+ decree max_decree_on_disk() const;
+
+ // Get current max committed decree on disk for plog.
+ //
+ // Thread safe.
decree max_commit_on_disk() const;
// Decree of the maximum garbage-collected mutation.
@@ -260,7 +277,7 @@ public:
// than the others, the max_gced_decree = 9.
// Returns `invalid_decree` when plog directory is empty.
//
- // thread-safe & private log only
+ // Thread safe & private log only.
decree max_gced_decree(gpid gpid) const;
decree max_gced_decree_no_lock(gpid gpid) const;
@@ -269,11 +286,14 @@ public:
// thread-safe
log_file_map_by_index get_log_file_map() const;
- // check the consistence of valid_start_offset
- // thread safe
+ // Check the consistence of valid_start_offset
+ //
+ // Thread safe.
void check_valid_start_offset(gpid gpid, int64_t valid_start_offset) const;
- // get total size.
+ // Get the total size.
+ //
+ // Thread safe.
int64_t total_size() const;
void hint_switch_file() { _switch_file_hint = true; }
@@ -282,20 +302,22 @@ public:
task_tracker *tracker() { return &_tracker; }
protected:
- // thread-safe
// 'size' is data size to write; the '_global_end_offset' will be updated
by 'size'.
// can switch file only when create_new_log_if_needed = true;
// return pair: the first is target file to write; the second is the
global offset to start
- // write
+ // write.
+ //
+ // Thread safe.
std::pair<log_file_ptr, int64_t> mark_new_offset(size_t size, bool
create_new_log_if_needed);
- // thread-safe
+
+ // Thread safe.
int64_t get_global_offset() const
{
zauto_lock l(_lock);
return _global_end_offset;
}
- // init memory states
+ // Init memory states.
virtual void init_states();
private:
@@ -310,10 +332,13 @@ private:
replay_callback callback,
/*out*/ int64_t &end_offset);
- // update max decree without lock
+ // Update max decree without lock.
void update_max_decree_no_lock(gpid gpid, decree d);
- // update max commit on disk without lock
+ // Update max decree on disk without lock.
+ void update_max_decree_on_disk_no_lock(decree d);
+
+ // Update max committed decree on disk without lock.
void update_max_commit_on_disk_no_lock(decree d);
// create new log file and set it as the current log file
@@ -323,7 +348,7 @@ private:
// - _lock.locked()
error_code create_new_log_file();
- // get total size ithout lock.
+ // Get total size without lock.
int64_t total_size_no_lock() const;
protected:
@@ -367,11 +392,16 @@ private:
// replica log info for private log
replica_log_info _private_log_info;
- decree
- _private_max_commit_on_disk; // the max last_committed_decree of
written mutations up to now
- // used for limiting garbage collection
of shared log, because
- // the ending of private log should be
covered by shared log
+
+ // The max decree of the mutations that have ever been written onto the
disk for plog.
+ decree _plog_max_decree_on_disk;
+
+ // The max decree of the committed mutations that have ever been written
onto the disk
+ // for plog. Since it is set with
mutation.data.header.last_committed_decree, it must
+ // be less than _plog_max_decree_on_disk.
+ decree _plog_max_commit_on_disk;
};
+
typedef dsn::ref_ptr<mutation_log> mutation_log_ptr;
class mutation_log_private : public mutation_log, private replica_base
@@ -418,6 +448,7 @@ private:
void commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_appender> &pending,
+ decree max_decree,
decree max_commit);
void init_states() override;
diff --git a/src/replica/replica.h b/src/replica/replica.h
index ae0118dc0..12e505dcd 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -243,6 +243,7 @@ public:
JSON_ENCODE_OBJ(writer, max_prepared_decree, max_prepared_decree());
JSON_ENCODE_OBJ(writer, max_plog_decree,
_private_log->max_decree(get_gpid()));
+ JSON_ENCODE_OBJ(writer, max_plog_decree_on_disk,
_private_log->max_decree_on_disk());
JSON_ENCODE_OBJ(writer, max_plog_commit_on_disk,
_private_log->max_commit_on_disk());
JSON_ENCODE_OBJ(writer, last_committed_decree,
last_committed_decree());
JSON_ENCODE_OBJ(writer, last_applied_decree, last_applied_decree());
diff --git a/src/replica/test/mutation_log_test.cpp
b/src/replica/test/mutation_log_test.cpp
index 5d2f339b7..ef7960158 100644
--- a/src/replica/test/mutation_log_test.cpp
+++ b/src/replica/test/mutation_log_test.cpp
@@ -291,13 +291,15 @@ public:
void TearDown() override { utils::filesystem::remove_path(_log_dir); }
- mutation_ptr create_test_mutation(decree d, const std::string &data)
override
+ mutation_ptr create_test_mutation(int64_t decree,
+ int64_t last_committed_decree,
+ const std::string &data) override
{
mutation_ptr mu(new mutation());
mu->data.header.ballot = 1;
- mu->data.header.decree = d;
+ mu->data.header.decree = decree;
mu->data.header.pid = get_gpid();
- mu->data.header.last_committed_decree = d - 1;
+ mu->data.header.last_committed_decree = last_committed_decree;
mu->data.header.log_offset = 0;
binary_writer writer;
@@ -313,6 +315,11 @@ public:
return mu;
}
+ mutation_ptr create_test_mutation(int64_t decree, const std::string &data)
override
+ {
+ return mutation_log_test::create_test_mutation(decree, decree - 1,
data);
+ }
+
static void ASSERT_BLOB_EQ(const blob &lhs, const blob &rhs)
{
ASSERT_EQ(std::string(lhs.data(), lhs.length()),
std::string(rhs.data(), rhs.length()));
diff --git a/src/replica/test/replica_test_base.h
b/src/replica/test/replica_test_base.h
index 9296e2a4e..0fe479d8a 100644
--- a/src/replica/test/replica_test_base.h
+++ b/src/replica/test/replica_test_base.h
@@ -61,13 +61,14 @@ public:
_log_dir = _replica->dir();
}
- virtual mutation_ptr create_test_mutation(int64_t decree, const
std::string &data)
+ virtual mutation_ptr
+ create_test_mutation(int64_t decree, int64_t last_committed_decree, const
std::string &data)
{
mutation_ptr mu(new mutation());
mu->data.header.ballot = 1;
mu->data.header.decree = decree;
mu->data.header.pid = _replica->get_gpid();
- mu->data.header.last_committed_decree = decree - 1;
+ mu->data.header.last_committed_decree = last_committed_decree;
mu->data.header.log_offset = 0;
mu->data.header.timestamp = decree;
@@ -84,7 +85,14 @@ public:
return mu;
}
+ virtual mutation_ptr create_test_mutation(int64_t decree, const
std::string &data)
+ {
+ return replica_test_base::create_test_mutation(decree, decree - 1,
data);
+ }
+
gpid get_gpid() const { return _replica->get_gpid(); }
+
+ void set_last_applied_decree(decree d) {
_replica->set_app_last_committed_decree(d); }
};
} // namespace replication
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]