This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 933ac39de feat(make_idempotent): support making write requests
idempotent for primary replicas (#2198)
933ac39de is described below
commit 933ac39de38713a4e63f6db4b8a976bf1de9aef4
Author: Dan Wang <[email protected]>
AuthorDate: Mon Mar 10 23:13:46 2025 +0800
feat(make_idempotent): support making write requests idempotent for primary
replicas (#2198)
https://github.com/apache/incubator-pegasus/issues/2197
Suppose that a client issues an `incr` request to increase the base value
`100` by `1`.
If the current configuration requires all atomic write requests to be
idempotent, the
primary replica will make this request idempotent by following steps after
receiving
it:
1. A mutation with `is_blocking = true` will be created to store this
request and then
added to the mutation queue as a blocking mutation.
2. Once this blocking mutation is ready to get popped, it will be the first
element of
the entire queue, thereby blocking it (which means any mutation cannot be
dequeued
from it).
3. This blocking mutation cannot get popped until all previous write
requests have
been applied.
4. After popped, the current base value `100` is read from the storage
engine, and after
performing the `incr` operation, a single put request is created to store
the final value
`101`.
5. Another mutation is then created to store this idempotent single put
request, which is
subsequently added to the write pipeline, including writing to `plog` and
broadcasting
to secondary replicas.
---
src/replica/bulk_load/replica_bulk_loader.cpp | 4 +-
src/replica/mutation.cpp | 303 ++++++++++++++++++--------
src/replica/mutation.h | 190 ++++++++++++----
src/replica/replica.cpp | 23 +-
src/replica/replica.h | 135 ++++++++++--
src/replica/replica_2pc.cpp | 147 +++++++++++--
src/replica/replica_base.h | 12 +-
src/replica/replica_check.cpp | 2 +-
src/replica/replica_chkpt.cpp | 2 +-
src/replica/replica_config.cpp | 8 +-
src/replica/replica_context.h | 5 +-
src/replica/split/replica_split_manager.cpp | 2 +-
src/replica/split/test/replica_split_test.cpp | 45 ++--
src/replica/test/cold_backup_context_test.cpp | 1 -
src/runtime/serverlet.h | 7 +-
src/task/task_spec.cpp | 58 ++---
16 files changed, 697 insertions(+), 247 deletions(-)
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 7aed63fe8..05d77e859 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -716,7 +716,7 @@ void replica_bulk_loader::check_ingestion_finish()
// checkpoint, to gurantee the condition above, we should pop all
committed mutations in
// prepare list to gurantee learn type is LT_APP
mutation_ptr mu = _replica->new_mutation(invalid_decree);
- mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+ mu->add_client_request(nullptr);
_replica->init_prepare(mu, false, true);
_replica->_primary_states.ingestion_is_empty_prepare_sent = true;
}
@@ -735,7 +735,7 @@ void replica_bulk_loader::handle_bulk_load_succeed()
// send an empty prepare again to gurantee that learner should learn from
checkpoint
if (status() == partition_status::PS_PRIMARY) {
mutation_ptr mu = _replica->new_mutation(invalid_decree);
- mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+ mu->add_client_request(nullptr);
_replica->init_prepare(mu, false, true);
}
}
diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp
index e1c73e8ae..b630abab9 100644
--- a/src/replica/mutation.cpp
+++ b/src/replica/mutation.cpp
@@ -36,6 +36,7 @@
#include "common/replication.codes.h"
#include "replica.h"
#include "runtime/api_task.h"
+#include "task/task_code.h"
#include "task/task_spec.h"
#include "utils/binary_reader.h"
#include "utils/binary_writer.h"
@@ -43,32 +44,35 @@
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
-#include "utils/ports.h"
DSN_DEFINE_uint64(
replication,
abnormal_write_trace_latency_threshold,
- 1000 * 1000 * 1000, // 1s
+ 1000UL * 1000UL * 1000UL, // 1s
"Latency trace will be logged when exceed the write latency threshold, in
nanoseconds");
DSN_TAG_VARIABLE(abnormal_write_trace_latency_threshold, FT_MUTABLE);
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
+
std::atomic<uint64_t> mutation::s_tid(0);
mutation::mutation()
+ : _tracer(std::make_shared<dsn::utils::latency_tracer>(
+ false, "mutation", FLAGS_abnormal_write_trace_latency_threshold)),
+ _private0(0),
+ _prepare_ts_ms(0),
+ _name{0},
+ _appro_data_bytes(sizeof(mutation_header)),
+ _create_ts_ns(dsn_now_ns()),
+ _tid(++s_tid),
+ _is_sync_to_child(false)
{
- next = nullptr;
- _private0 = 0;
_not_logged = 1;
- _prepare_ts_ms = 0;
+ _left_secondary_ack_count = 0;
+ _left_potential_secondary_ack_count = 0;
+ _wait_child = false;
+ _is_error_acked = false;
strcpy(_name, "0.0.0.0");
- _appro_data_bytes = sizeof(mutation_header);
- _create_ts_ns = dsn_now_ns();
- _tid = ++s_tid;
- _is_sync_to_child = false;
- _tracer = std::make_shared<dsn::utils::latency_tracer>(
- false, "mutation", FLAGS_abnormal_write_trace_latency_threshold);
}
mutation_ptr mutation::copy_no_reply(const mutation_ptr &old_mu)
@@ -150,29 +154,29 @@ void mutation::copy_from(mutation_ptr &old)
}
}
-void mutation::add_client_request(task_code code, dsn::message_ex *request)
+void mutation::add_client_request(dsn::message_ex *request)
{
- data.updates.push_back(mutation_update());
+ data.updates.emplace_back();
mutation_update &update = data.updates.back();
_appro_data_bytes += 32; // approximate code size
if (request != nullptr) {
- update.code = code;
+ update.code = request->rpc_code();
update.serialization_type =
-
(dsn_msg_serialize_format)request->header->context.u.serialize_format;
- update.__set_start_time_ns(dsn_now_ns());
+
static_cast<dsn_msg_serialize_format>(request->header->context.u.serialize_format);
+ update.__set_start_time_ns(static_cast<int64_t>(dsn_now_ns()));
request->add_ref(); // released on dctor
- void *ptr;
- size_t size;
+ void *ptr = nullptr;
+ size_t size = 0;
CHECK(request->read_next(&ptr, &size), "payload is not present");
request->read_commit(0); // so we can re-read the request buffer in
replicated app
- update.data.assign((char *)ptr, 0, (int)size);
+ update.data.assign(static_cast<const char *>(ptr), 0, size);
- _appro_data_bytes += sizeof(int) + (int)size; // data size
+ _appro_data_bytes += static_cast<int>(sizeof(int) + size); // data size
} else {
update.code = RPC_REPLICATION_WRITE_EMPTY;
- _appro_data_bytes += sizeof(int); // empty data size
+ _appro_data_bytes += static_cast<int>(sizeof(int)); // empty data size
}
client_requests.push_back(request);
@@ -341,127 +345,236 @@ void mutation::wait_log_task() const
}
}
-mutation_queue::mutation_queue(gpid gpid,
- int max_concurrent_op /*= 2*/,
- bool batch_write_disabled /*= false*/)
- : _max_concurrent_op(max_concurrent_op),
_batch_write_disabled(batch_write_disabled)
+mutation_queue::mutation_queue(replica *r,
+ gpid gpid,
+ int max_concurrent_op,
+ bool batch_write_disabled)
+ : _replica(r),
+ _current_op_count(0),
+ _max_concurrent_op(max_concurrent_op),
+ _batch_write_disabled(batch_write_disabled)
{
- _current_op_count = 0;
- _pending_mutation = nullptr;
CHECK_NE_MSG(gpid.get_app_id(), 0, "invalid gpid");
_pcount = dsn_task_queue_virtual_length_ptr(RPC_PREPARE,
gpid.thread_hash());
}
-mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex
*request, replica *r)
+void mutation_queue::promote_pending()
+{
+ _queue.push(_pending_mutation);
+ _pending_mutation.reset();
+ ++(*_pcount);
+}
+
+void mutation_queue::try_promote_pending(task_spec *spec)
+{
+ // Promote `_pending_mutation` to `_queue` in following cases:
+ // - this client request (whose specification is `spec`) is not allowed to
be batched, or
+ // - the size of `_pending_mutation` reaches the upper limit, or
+ // - batch write is disabled (initialized by FLAGS_batch_write_disabled).
+ //
+ // Choose `_batch_write_disabled` as the last condition to be checked to
optimize the
+ // performance by short-circuit evaluation since it is actually
FLAGS_batch_write_disabled
+ // which is mostly set false by default while other conditions vary with
different incoming
+ // client requests.
+ if (spec->rpc_request_is_write_allow_batch &&
!_pending_mutation->is_full() &&
+ !_batch_write_disabled) {
+ return;
+ }
+
+ promote_pending();
+}
+
+mutation_ptr mutation_queue::try_unblock()
+{
+ CHECK_NOTNULL(_blocking_mutation, "");
+
+ // All of the mutations before the blocking mutation must have been in
prepare list.
+ const auto max_prepared_decree = _replica->max_prepared_decree();
+ const auto last_applied_decree = _replica->last_applied_decree();
+ if (max_prepared_decree > last_applied_decree) {
+ return {};
+ }
+
+ // All of the mutations before the blocking mutation must have been
applied.
+ CHECK_EQ(max_prepared_decree, last_applied_decree);
+
+ // Pop the blocking mutation into the write pipeline to be processed.
+ mutation_ptr mu = _blocking_mutation;
+
+ // Disable the blocking mutation as it has been popped.
+ _blocking_mutation = nullptr;
+
+ // Increase the number of the mutations being processed currently as the
blocking
+ // mutation is popped.
+ ++_current_op_count;
+
+ return mu;
+}
+
+mutation_ptr mutation_queue::try_block(mutation_ptr &mu)
+{
+ CHECK_NOTNULL(mu, "");
+
+ // If the immediately popped mutation is non-blocking, just return it to
be processed.
+ if (!mu->is_blocking) {
+ ++_current_op_count;
+ return mu;
+ }
+
+ CHECK_NULL(_blocking_mutation, "");
+
+ // Enable the blocking mutation once the immediately popped mutation `mu`
is found blocking.
+ _blocking_mutation = mu;
+
+ // If all of mutations before the blocking mutation have been applied, we
could unblock
+ // the queue immediately.
+ return try_unblock();
+}
+
+mutation_ptr mutation_queue::add_work(message_ex *request)
{
- task_spec *spec = task_spec::get(code);
+ CHECK_NOTNULL(request, "");
- // if not allow write batch, switch work queue
- if (_pending_mutation && !spec->rpc_request_is_write_allow_batch) {
- _pending_mutation->add_ref(); // released when unlink
- _hdr.add(_pending_mutation);
- _pending_mutation = nullptr;
- ++(*_pcount);
+ auto *spec = task_spec::get(request->rpc_code());
+ CHECK_NOTNULL(spec, "");
+
+ // If this request is not allowed to be batched, promote
`_pending_mutation` if it is
+ // non-null. We don't check `_batch_write_disabled` since
`_pending_mutation` must be
+ // null now if it is true.
+ if (_pending_mutation != nullptr &&
!spec->rpc_request_is_write_allow_batch) {
+ promote_pending();
}
- // add to work queue
- if (!_pending_mutation) {
- _pending_mutation = r->new_mutation(invalid_decree);
+ // Once `_pending_mutation` is cleared, just assign a new mutation to it.
If the client
+ // request is an atomic write and should be translated into idempotent
writes, this new
+ // mutation will be created as a blocking mutation.
+ if (_pending_mutation == nullptr) {
+ _pending_mutation =
+ _replica->new_mutation(invalid_decree,
_replica->need_make_idempotent(spec));
}
LOG_DEBUG("add request with trace_id = {:#018x} into mutation with
mutation_tid = {}",
request->header->trace_id,
_pending_mutation->tid());
- _pending_mutation->add_client_request(code, request);
+ // Append the incoming client request to `_pending_mutation`.
+ _pending_mutation->add_client_request(request);
- // short-cut
- if (_current_op_count < _max_concurrent_op && _hdr.is_empty()) {
- auto ret = _pending_mutation;
- _pending_mutation = nullptr;
- _current_op_count++;
- return ret;
+ // Throttling is triggered as there are too many mutations being processed
as 2PC. Return
+ // null in case more mutations flow into the write pipeline.
+ if (_current_op_count >= _max_concurrent_op) {
+ // Since the pending mutation was just filled with the client request,
try to promote
+ // it.
+ try_promote_pending(spec);
+ return {};
}
- // check if need to switch work queue
- if (_batch_write_disabled || !spec->rpc_request_is_write_allow_batch ||
- _pending_mutation->is_full()) {
- _pending_mutation->add_ref(); // released when unlink
- _hdr.add(_pending_mutation);
- _pending_mutation = nullptr;
- ++(*_pcount);
+ // Once the blocking mutation is enabled, return null if still blocked, or
non-null
+ // blocking mutation if succeeding in unblocking.
+ if (_blocking_mutation != nullptr) {
+ // Since the pending mutation was just filled with the client request,
try to promote
+ // it.
+ try_promote_pending(spec);
+ return try_unblock();
}
- // get next work item
- if (_current_op_count >= _max_concurrent_op)
- return nullptr;
- else if (_hdr.is_empty()) {
- CHECK_NOTNULL(_pending_mutation, "pending mutation cannot be null");
-
- auto ret = _pending_mutation;
- _pending_mutation = nullptr;
- _current_op_count++;
- return ret;
+ mutation_ptr mu;
+ if (_queue.empty()) {
+ // `_pending_mutation` must be non-null now. There's no need to
promote it as `_queue`
+ // is empty: just pop it as the next work candidate to be processed.
+ mu = _pending_mutation;
+ _pending_mutation.reset();
} else {
- _current_op_count++;
- return unlink_next_workload();
+ // Since the pending mutation was just filled with the client request,
try to promote
+ // it.
+ try_promote_pending(spec);
+
+ // Now the first element of `_queue` is the head of the entire queue.
Pop and return it
+ // as the next work candidate to be processed.
+ mu = pop_internal_queue();
}
+
+ // Currently the popped work is still a candidate: once it is a blocking
mutation, the queue
+ // may become blocked and nothing will be returned.
+ return try_block(mu);
}
-mutation_ptr mutation_queue::check_possible_work(int current_running_count)
+mutation_ptr mutation_queue::next_work(int current_running_count)
{
_current_op_count = current_running_count;
- if (_current_op_count >= _max_concurrent_op)
- return nullptr;
+ // Throttling is triggered as there are too many mutations being processed
as 2PC. Just
+ // return null in case more mutations flow into the write pipeline.
+ if (_current_op_count >= _max_concurrent_op) {
+ return {};
+ }
- // no further workload
- if (_hdr.is_empty()) {
- if (_pending_mutation != nullptr) {
- auto ret = _pending_mutation;
- _pending_mutation = nullptr;
- _current_op_count++;
- return ret;
- } else {
- return nullptr;
- }
+ // Once the blocking mutation is enabled, return null if still blocked, or
non-null
+ // blocking mutation if succeeding in unblocking.
+ if (_blocking_mutation != nullptr) {
+ return try_unblock();
}
- // run further workload
- else {
- _current_op_count++;
- return unlink_next_workload();
+ mutation_ptr mu;
+ if (_queue.empty()) {
+ // There's not any further work to be processed if `_pending_mutation`
is also null.
+ if (_pending_mutation == nullptr) {
+ return {};
+ }
+
+ // `_pending_mutation` is not null now. Just pop it as the next work
candidate to be
+ // processed.
+ mu = _pending_mutation;
+ _pending_mutation.reset();
+ } else {
+ // Now the first element of `_queue` is the head of the entire queue.
Pop and return it
+ // as the next work candidate to be processed.
+ mu = pop_internal_queue();
}
+
+ // Currently the popped work is still a candidate: once it is a blocking
mutation, the queue
+ // may become blocked and nothing will be returned.
+ return try_block(mu);
}
void mutation_queue::clear()
{
- if (_pending_mutation != nullptr) {
- _pending_mutation = nullptr;
+ if (_blocking_mutation != nullptr) {
+ _blocking_mutation.reset();
}
+ // Use pop_internal_queue() to clear `_queue` since `_pcount` should also
be updated.
mutation_ptr r;
- while ((r = unlink_next_workload()) != nullptr) {
+ while ((r = pop_internal_queue()) != nullptr) {
+ }
+
+ if (_pending_mutation != nullptr) {
+ _pending_mutation.reset();
}
}
void mutation_queue::clear(std::vector<mutation_ptr> &queued_mutations)
{
- mutation_ptr r;
queued_mutations.clear();
- while ((r = unlink_next_workload()) != nullptr) {
+
+ if (_blocking_mutation != nullptr) {
+ queued_mutations.emplace_back(std::move(_blocking_mutation));
+ _blocking_mutation.reset();
+ }
+
+ // Use pop_internal_queue() to clear `_queue` since `_pcount` should also
be updated.
+ mutation_ptr r;
+ while ((r = pop_internal_queue()) != nullptr) {
queued_mutations.emplace_back(r);
}
if (_pending_mutation != nullptr) {
queued_mutations.emplace_back(std::move(_pending_mutation));
- _pending_mutation = nullptr;
+ _pending_mutation.reset();
}
- // we don't reset the current_op_count, coz this is handled by
- // check_possible_work. In which, the variable current_running_count
- // is handled by prepare_list
- // _current_op_count = 0;
+ // We don't reset the `_current_op_count` here, since it is done by
next_work() where the
+ // parameter `current_running_count` is specified to reset
`_current_op_count` as 0.
}
-} // namespace replication
-} // namespace dsn
+
+} // namespace dsn::replication
diff --git a/src/replica/mutation.h b/src/replica/mutation.h
index b6b8ecd14..2c3510520 100644
--- a/src/replica/mutation.h
+++ b/src/replica/mutation.h
@@ -30,6 +30,7 @@
#include <atomic>
#include <functional>
#include <memory>
+#include <queue>
#include <vector>
#include "common/replication_common.h"
@@ -38,16 +39,16 @@
#include "rpc/rpc_message.h"
#include "runtime/api_layer1.h"
#include "task/task.h"
-#include "task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
-#include "utils/link.h"
+#include "utils/ports.h"
namespace dsn {
class binary_reader;
class binary_writer;
class blob;
class gpid;
+class task_spec;
namespace utils {
class latency_tracer;
@@ -57,16 +58,20 @@ namespace replication {
class mutation;
-typedef dsn::ref_ptr<mutation> mutation_ptr;
+using mutation_ptr = dsn::ref_ptr<mutation>;
-// mutation is the 2pc unit of PacificA, which wraps one or more client
requests and add
-// header informations related to PacificA algorithm for them.
-// both header and client request content are put into "data" member.
+// As 2PC unit of PacificA, a mutation contains one or more write requests
with header
+// information related to PacificA algorithm in `data` member. It is appended
to plog
+// and written into prepare request broadcast to secondary replicas. It also
holds the
+// original client requests used to build the response to the client.
class mutation : public ref_counter
{
public:
mutation();
- virtual ~mutation();
+ ~mutation() override;
+
+ DISALLOW_COPY_AND_ASSIGN(mutation);
+ DISALLOW_MOVE_AND_ASSIGN(mutation);
// copy mutation from an existing mutation, typically used in partition
split
// mutation should not reply to client, because parent has already replied
@@ -105,7 +110,14 @@ public:
// state change
void set_id(ballot b, decree c);
void set_timestamp(int64_t timestamp) { data.header.timestamp = timestamp;
}
- void add_client_request(task_code code, dsn::message_ex *request);
+
+ // Append a write request to this mutation, and also hold it if it is from
a client
+ // to build the response to the client later.
+ //
+ // Parameters:
+ // - request: is from a client if non-null, otherwise is an empty write.
+ void add_client_request(dsn::message_ex *request);
+
void copy_from(mutation_ptr &old);
void set_logged()
{
@@ -153,14 +165,29 @@ public:
// user requests
std::vector<dsn::message_ex *> client_requests;
+ // A mutation will be a blocking mutation if `is_blocking` is true. A
blocking mutation
+ // will not begin to be popped from the queue and processed until all of
mutations before
+ // it in the queue have been committed and applied into RocksDB. This
field is only used
+ // by primary replicas.
+ //
+ // For example, if the primary replica receives an incr request (with a
base value of 1)
+ // and the current configuration requires all atomic write requests to be
idempotent, then:
+ // 1. A mutation with `is_blocking` = true will be created to store this
request and then
+ // added to the mutation queue.
+ // 2. This mutation request will only be dequeued after all previous write
requests have
+ // been applied.
+ // 3. Next, the current base value 100 is read from the storage engine,
and after performing
+ // the incr operation, a single put request is created to store the final
value 101.
+ // 4. Another mutation is then created to store this idempotent single put
request, which is
+ // subsequently added to the write pipeline, including writing to the plog
and broadcasting
+ // to the secondary replicas.
+ bool is_blocking{false};
+
// The original request received from the client. While making an atomic
request (incr,
// check_and_set and check_and_mutate) idempotent, an extra variable is
needed to hold
// its original request for the purpose of replying to the client.
dsn::message_ptr original_request;
- // used by pending mutation queue only
- mutation *next;
-
std::shared_ptr<dsn::utils::latency_tracer> _tracer;
void set_is_sync_to_child(bool sync_to_child) { _is_sync_to_child =
sync_to_child; }
@@ -200,61 +227,150 @@ private:
class replica;
-// mutation queue are queues for mutations waiting to send.
-// more precisely: for client requests waiting to send.
-// mutations are queued as "_hdr + _pending_mutation". that is to say,
_hdr.first is the first
-// element in the queue, and pending_mutations is the last.
+// The mutation queue caches the mutations waiting to be processed in order by
the write pipeline,
+// including appended to plog and broadcast to secondary replicas. This class
is only used by
+// primary replicas.
+//
+// The entire queue is arranged in the order of `_blocking_mutation + _queue +
_pending_mutation`,
+// meaning that `_blocking_mutation` is the head of the queue if it is
non-null, for the reason
+// that it is enabled only when the mutation ready to get popped from the
queue is a blocking
+// mutation: it will block the entire queue from which none could get popped
until all of the
+// mutations before it have been applied.
//
-// we keep 2 structure "hdr" and "pending_mutation" coz:
-// 1. as a container of client requests, capacity of a mutation is limited, so
incoming client
-// requets should be packed into different mutations
-// 2. number of preparing mutations is also limited, so we should queue new
created mutations and
-// try to send them as soon as the concurrent condition satisfies.
+// Once `_blocking_mutation` is cleared and becomes null, the head of the
queue will be the first
+// element of `_queue`. `_pending_mutation` is the tail of the queue,
separated from `_queue` due
+// to the following reasons:
+// 1. As a carrier for storing client requests, each mutation needs to be
size-limited. For each
+// incoming request, we need to decide whether to continue storing it in the
most recent mutation
+// (i.e. `_pending_mutation`) or to create a new one.
+// 2. The number of concurrent two-phase commits is limited. We should ensure
the requests in
+// each mutation could be processed as soon as possible if it does not reach
the upper limit,
+// even if the requests are in the latest mutation.
+// 3. Some writes (such as non-single writes) do not allow batching. Once this
kind of requests
+// are received, a new mutation (`_pending_mutation`) should be created to
hold them.
class mutation_queue
{
public:
- mutation_queue(gpid gpid, int max_concurrent_op = 2, bool
batch_write_disabled = false);
+ mutation_queue(replica *r, gpid gpid, int max_concurrent_op, bool
batch_write_disabled);
~mutation_queue()
{
clear();
- CHECK(_hdr.is_empty(),
+ CHECK(_queue.empty(),
"work queue is deleted when there are still {} running ops or
pending work items "
"in queue",
_current_op_count);
}
- mutation_ptr add_work(task_code code, dsn::message_ex *request, replica
*r);
+ DISALLOW_COPY_AND_ASSIGN(mutation_queue);
+ DISALLOW_MOVE_AND_ASSIGN(mutation_queue);
+ // Append the input request from the client to the queue by filling the
latest mutation
+ // with it.
+ //
+ // Parameters:
+ // - request: must be non-null and from a client.
+ //
+ // Return the next mutation needing to be processed in order. Returning
null means the
+ // queue is being blocked or does not have any mutation.
+ mutation_ptr add_work(dsn::message_ex *request);
+
+ // Get the next mutation in order, typically called immediately after the
current
+ // mutation was applied, or the membership was changed and we became the
primary
+ // replica.
+ //
+ // Parameters:
+ // - current_running_count: used to reset the current number of the
mutations being
+ // processed, typically the gap between the max committed decree and the
max prepared
+ // decree. `_current_op_count` is never decreased directly: this parameter
provides
+ // the only way to decrease it.
+ //
+ // Return the next mutation needing to be processed in order. Returning
null means the
+ // queue is being blocked or does not have any mutation.
+ mutation_ptr next_work(int current_running_count);
+
+ // Clear the entire queue.
void clear();
- // called when you want to clear the mutation_queue and want to get the
remaining messages
- void clear(std::vector<mutation_ptr> &queued_mutations);
- // called when the curren operation is completed or replica configuration
is change,
- // which triggers further round of operations as returned
- mutation_ptr check_possible_work(int current_running_count);
+ // Get the remaining unprocessed mutations and clear the entire queue.
+ //
+ // Parameters:
+ // - queued_mutations: the output parameter used to hold the remaining
unprocessed
+ // mutations.
+ void clear(std::vector<mutation_ptr> &queued_mutations);
private:
- mutation_ptr unlink_next_workload()
+ // Promote `_pending_mutation` to `_queue`. Before the promotion,
`_pending_mutation`
+ // should not be null (otherwise the behaviour is undefined).
+ void promote_pending();
+
+ // If some conditions are met, promote `_pending_mutation` to `_queue`.
Before the
+ // promotion, `_pending_mutation` should not be null (otherwise the
behaviour is
+ // undefined).
+ //
+ // Parameters:
+ // - spec: the specification for the incoming client request, used to
check if this client
+ // request is allowed to be batched.
+ void try_promote_pending(task_spec *spec);
+
+ // Once the blocking mutation is enabled, the queue will be blocked and
any mutation cannot
+ // get popped. However, once the mutations before the blocking mutation
have been applied
+ // into RocksDB, the blocking mutation can be disabled and the queue will
be "unblocked".
+ // `_blocking_mutation` should not be null before this function is called.
+ //
+ // Return non-null blocking mutation if succeeding in unblocking,
otherwise return null
+ // which means the queue is still blocked.
+ mutation_ptr try_unblock();
+
+ // If immediately popped `mu` is not a blocking mutation, this function
will do nothing
+ // but increasing the count for the mutations being processed. Otherwise,
it will set
+ // `mu` to `_blocking_mutation` to enable the blocking mutation.
`_blocking_mutation`
+ // should be null before this function is called.
+ //
+ // Parameters:
+ // - mu: the mutation immediately popped from the head of `_queue +
_pending_mutation`.
+ // Should not be null.
+ //
+ // Return the next mutation needing to be processed in order. Returning
null means the
+ // queue is being blocked or does not have any mutation.
+ mutation_ptr try_block(mutation_ptr &mu);
+
+ // Pop the mutation from the head of `_queue`.
+ //
+ // Return non-null mutation if the queue is not empty, otherwise return
null.
+ mutation_ptr pop_internal_queue()
{
- mutation_ptr r = _hdr.pop_one();
- if (r.get() != nullptr) {
- r->release_ref(); // added in add_work
- --(*_pcount);
+ if (_queue.empty()) {
+ return {};
}
- return r;
+
+ const auto work = _queue.front();
+ _queue.pop();
+ --(*_pcount);
+
+ return work;
}
- void reset_max_concurrent_ops(int max_c) { _max_concurrent_op = max_c; }
+ void reset_max_concurrent_ops(int max) { _max_concurrent_op = max; }
+
+ replica *_replica;
-private:
int _current_op_count;
int _max_concurrent_op;
bool _batch_write_disabled;
volatile int *_pcount;
mutation_ptr _pending_mutation;
- slist<mutation> _hdr;
+ std::queue<mutation_ptr> _queue;
+
+ // Once a mutation getting popped from `_queue + _pending_mutation` is
found blocking, it
+ // will firstly be set to `_blocking_mutation` to enable the blocking
mutation; then, the
+ // mutation popped from the queue will always be null. Only after the all
of the mutations
+ // before the blocking mutations have been applied into RocksDB can
`_blocking_mutation`
+ // be popped and disabled; then, the mutations will continue to get popped
from this queue
+ // in order until another blocking mutations appears.
+ mutation_ptr _blocking_mutation;
};
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 4175bb816..68e2839d0 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -58,7 +58,6 @@
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
-#include "utils/ports.h"
#include "utils/rand.h"
DSN_DEFINE_bool(replication,
@@ -275,7 +274,8 @@ replica::replica(replica_stub *stub,
: serverlet<replica>(replication_options::kReplicaAppType.c_str()),
replica_base(gpid, fmt::format("{}@{}", gpid,
stub->_primary_host_port_cache), app.app_name),
_app_info(app),
- _primary_states(gpid, FLAGS_staleness_for_commit,
FLAGS_batch_write_disabled),
+ _make_write_idempotent(false),
+ _primary_states(this, gpid, FLAGS_staleness_for_commit,
FLAGS_batch_write_disabled),
_potential_secondary_states(this),
_chkpt_total_size(0),
_cur_download_size(0),
@@ -377,7 +377,7 @@ void replica::init_state()
get_bool_envs(_app_info.envs, replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND,
_allow_ingest_behind);
}
-replica::~replica(void)
+replica::~replica()
{
close();
_prepare_list = nullptr;
@@ -560,14 +560,27 @@ void replica::execute_mutation(mutation_ptr &mu)
}
ADD_CUSTOM_POINT(mu->_tracer, "completed");
- auto next = _primary_states.write_queue.check_possible_work(
- static_cast<int>(_prepare_list->max_decree() - d));
+ auto next =
_primary_states.write_queue.next_work(static_cast<int>(max_prepared_decree() -
d));
if (next != nullptr) {
init_prepare(next, false);
}
}
+mutation_ptr replica::new_mutation(decree decree, dsn::message_ex
*original_request)
+{
+ auto mu = new_mutation(decree);
+ mu->original_request = original_request;
+ return mu;
+}
+
+mutation_ptr replica::new_mutation(decree decree, bool is_blocking)
+{
+ auto mu = new_mutation(decree);
+ mu->is_blocking = is_blocking;
+ return mu;
+}
+
mutation_ptr replica::new_mutation(decree decree)
{
mutation_ptr mu(new mutation());
diff --git a/src/replica/replica.h b/src/replica/replica.h
index b312865c5..bc3b0b8fb 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -57,6 +57,7 @@
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/metrics.h"
+#include "utils/ports.h"
#include "utils/thread_access_checker.h"
#include "utils/throttling_controller.h"
#include "utils/uniq_timestamp_us.h"
@@ -71,22 +72,20 @@ class rocksdb_wrapper_test;
namespace dsn {
class gpid;
class host_port;
+class task_spec;
-namespace dist {
-
-namespace block_service {
+namespace dist::block_service {
class block_filesystem;
-} // namespace block_service
-} // namespace dist
+} // namespace dist::block_service
namespace security {
class access_controller;
} // namespace security
+
namespace replication {
class backup_request;
class backup_response;
-
class configuration_restore_request;
class detect_hotkey_request;
class detect_hotkey_response;
@@ -107,11 +106,12 @@ class replica_stub;
class replication_app_base;
class replication_options;
struct dir_node;
-typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
+
+using cold_backup_context_ptr = dsn::ref_ptr<cold_backup_context>;
namespace test {
class test_checker;
-}
+} // namespace test
#define CHECK_REQUEST_IF_SPLITTING(op_type)
\
do {
\
@@ -163,7 +163,10 @@ struct deny_client
class replica : public serverlet<replica>, public ref_counter, public
replica_base
{
public:
- ~replica(void);
+ ~replica() override;
+
+ DISALLOW_COPY_AND_ASSIGN(replica);
+ DISALLOW_MOVE_AND_ASSIGN(replica);
// return true when the mutation is valid for the current replica
bool replay_mutation(mutation_ptr &mu, bool is_private);
@@ -356,6 +359,34 @@ private:
void response_client_read(dsn::message_ex *request, error_code error);
void response_client_write(dsn::message_ex *request, error_code error);
void execute_mutation(mutation_ptr &mu);
+
+ // Create a new mutation with specified decree and the original atomic
write request,
+ // which is used to build the response to the client.
+ //
+ // Parameters:
+ // - decree: invalid_decree, or the real decree assigned to this mutation.
+ // - original_request: the original request of the atomic write.
+ //
+ // Return the newly created mutation.
+ mutation_ptr new_mutation(decree decree, dsn::message_ex
*original_request);
+
+ // Create a new mutation with specified decree and a flag marking whether
this is a
+ // blocking mutation (for a detailed explanation of blocking mutations,
refer to the
+ // comments for the field `is_blocking` of class `mutation`).
+ //
+ // Parameters:
+ // - decree: invalid_decree, or the real decree assigned to this mutation.
+ // - is_blocking: true means creating a blocking mutation.
+ //
+ // Return the newly created mutation.
+ mutation_ptr new_mutation(decree decree, bool is_blocking);
+
+ // Create a new mutation with specified decree.
+ //
+ // Parameters:
+ // - decree: invalid_decree, or the real decree assigned to this mutation.
+ //
+ // Return the newly created mutation.
mutation_ptr new_mutation(decree decree);
// initialization
@@ -371,17 +402,79 @@ private:
decree get_replay_start_decree();
/////////////////////////////////////////////////////////////////
- // 2pc
- // `pop_all_committed_mutations = true` will be used for ingestion empty
write
- // See more about it in `replica_bulk_loader.cpp`
- void
- init_prepare(mutation_ptr &mu, bool reconciliation, bool
pop_all_committed_mutations = false);
- void send_prepare_message(const ::dsn::host_port &addr,
+ // 2PC
+
+ // Given the specification for a client request, decide whether to reject
it as it is a
+ // non-idempotent request.
+ //
+ // Parameters:
+ // - spec: the specification for a client request, should not be null
(otherwise the
+ // behaviour is undefined).
+ //
+ // Return true if deciding to reject this client request.
+ bool need_reject_non_idempotent(task_spec *spec) const;
+
+ // Given the specification for a client request, decide whether to make it
idempotent.
+ //
+ // Parameters:
+ // - spec: the specification for a client request, should not be null
(otherwise the
+ // behaviour is undefined).
+ //
+ // Return true if deciding to make this client request idempotent.
+ bool need_make_idempotent(task_spec *spec) const;
+
+ // Given a client request, decide whether to make it idempotent.
+ //
+ // Parameters:
+ // - request: the client request, could be null.
+ //
+ // Return true if deciding to make this client request idempotent.
+ bool need_make_idempotent(message_ex *request) const;
+
+ // Make the atomic write request (if any) in a mutation idempotent.
+ //
+ // Parameters:
+ // - mu: the mutation where the atomic write request will be translated
into idempotent
+ // one. Should contain at least one client request. Once succeed in
translating, `mu`
+ // will be reassigned with the new idempotent mutation as the output. Thus
it is both an
+ // input and an output parameter.
+ //
+ // Return rocksdb::Status::kOk, or other code (rocksdb::Status::Code) if
some error
+ // occurred while making write idempotent.
+ int make_idempotent(mutation_ptr &mu);
+
+ // Launch 2PC for the specified mutation: it will be broadcast to
secondary replicas,
+ // appended to plog, and finally applied into storage engine.
+ //
+ // Parameters:
+ // - mu: the mutation pushed into the write pipeline.
+ // - reconciliation: true means the primary replica will be force to
launch 2PC for each
+ // uncommitted request in its prepared list to make them committed
regardless of whether
+ // there is a quorum to receive the prepare requests.
+ // - pop_all_committed_mutations: true means popping all committed
mutations while preparing
+ // locally, used for ingestion in bulk loader with empty write. See
`replica_bulk_loader.cpp`
+ // for details.
+ void init_prepare(mutation_ptr &mu, bool reconciliation, bool
pop_all_committed_mutations);
+
+ // The same as the above except that `pop_all_committed_mutations` is set
false.
+ void init_prepare(mutation_ptr &mu, bool reconciliation)
+ {
+ init_prepare(mu, reconciliation, false);
+ }
+
+ // Reply to the client with the error if 2PC failed.
+ //
+ // Parameters:
+ // - mu: the mutation for which 2PC failed.
+ // - err: the error that caused the 2PC failure.
+ void reply_with_error(const mutation_ptr &mu, const error_code &err);
+
+ void send_prepare_message(const host_port &hp,
partition_status::type status,
const mutation_ptr &mu,
int timeout_milliseconds,
- bool pop_all_committed_mutations = false,
- int64_t learn_signature = invalid_signature);
+ bool pop_all_committed_mutations,
+ int64_t learn_signature);
void on_append_log_completed(mutation_ptr &mu, error_code err, size_t
size);
void on_prepare_reply(std::pair<mutation_ptr, partition_status::type> pr,
error_code err,
@@ -644,6 +737,10 @@ private:
app_info _app_info;
std::map<std::string, std::string> _extra_envs;
+ // TODO(wangdan): temporarily used to mark whether we make all atomic
writes idempotent
+ // for this replica. Would make this configurable soon.
+ bool _make_write_idempotent;
+
// uniq timestamp generator for this replica.
//
// we use it to generate an increasing timestamp for current replica
@@ -768,6 +865,8 @@ private:
// Indicate where the storage engine data is corrupted and unrecoverable.
bool _data_corrupted{false};
};
-typedef dsn::ref_ptr<replica> replica_ptr;
+
+using replica_ptr = dsn::ref_ptr<replica>;
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index f0c108b7f..2983408c8 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -25,10 +25,11 @@
*/
#include <fmt/core.h>
-#include <inttypes.h>
-#include <stddef.h>
+#include <rocksdb/status.h>
#include <atomic>
#include <chrono>
+#include <cinttypes>
+#include <cstddef>
#include <functional>
#include <memory>
#include <string>
@@ -146,9 +147,18 @@ void replica::on_client_write(dsn::message_ex *request,
bool ignore_throttling)
return;
}
- task_spec *spec = task_spec::get(request->rpc_code());
- if (dsn_unlikely(nullptr == spec || request->rpc_code() ==
TASK_CODE_INVALID)) {
- LOG_ERROR("recv message with unhandled rpc name {} from {}, trace_id =
{}",
+ if (dsn_unlikely(request->rpc_code() == TASK_CODE_INVALID)) {
+ LOG_ERROR("recv message with invalid RPC code {} from {}, trace_id =
{}",
+ request->rpc_code(),
+ request->header->from_address,
+ request->header->trace_id);
+ response_client_write(request, ERR_INVALID_PARAMETERS);
+ return;
+ }
+
+ auto *spec = task_spec::get(request->rpc_code());
+ if (dsn_unlikely(spec == nullptr)) {
+ LOG_ERROR("recv message with unhandled RPC code {} from {}, trace_id =
{}",
request->rpc_code(),
request->header->from_address,
request->header->trace_id);
@@ -156,7 +166,7 @@ void replica::on_client_write(dsn::message_ex *request,
bool ignore_throttling)
return;
}
- if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) {
+ if (need_reject_non_idempotent(spec)) {
// Ignore non-idempotent write, because duplication provides no
guarantee of atomicity to
// make this write produce the same result on multiple clusters.
METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests);
@@ -220,16 +230,100 @@ void replica::on_client_write(dsn::message_ex *request,
bool ignore_throttling)
}
LOG_DEBUG_PREFIX("got write request from {}",
request->header->from_address);
- auto mu = _primary_states.write_queue.add_work(request->rpc_code(),
request, this);
+ auto mu = _primary_states.write_queue.add_work(request);
if (mu != nullptr) {
init_prepare(mu, false);
}
}
+bool replica::need_reject_non_idempotent(task_spec *spec) const
+{
+ if (!is_duplication_master()) {
+ return false;
+ }
+
+ if (_make_write_idempotent) {
+ return false;
+ }
+
+ return !spec->rpc_request_is_write_idempotent;
+}
+
+bool replica::need_make_idempotent(task_spec *spec) const
+{
+ if (!_make_write_idempotent) {
+ return false;
+ }
+
+ return !spec->rpc_request_is_write_idempotent;
+}
+
+bool replica::need_make_idempotent(message_ex *request) const
+{
+ if (request == nullptr) {
+ return false;
+ }
+
+ if (!_make_write_idempotent) {
+ return false;
+ }
+
+ auto *spec = task_spec::get(request->rpc_code());
+ CHECK_NOTNULL(spec, "RPC code {} not found", request->rpc_code());
+
+ return !spec->rpc_request_is_write_idempotent;
+}
+
+int replica::make_idempotent(mutation_ptr &mu)
+{
+ CHECK_TRUE(!mu->client_requests.empty());
+
+ message_ex *request = mu->client_requests.front();
+ if (!need_make_idempotent(request)) {
+ return rocksdb::Status::kOk;
+ }
+
+ // The original atomic write request must not be batched.
+ CHECK_EQ(mu->client_requests.size(), 1);
+
+ dsn::message_ex *new_request = nullptr;
+ const int err = _app->make_idempotent(request, &new_request);
+ if (dsn_unlikely(err != rocksdb::Status::kOk)) {
+ // Once some error occurred, the response with error must have been
returned to the
+ // client during _app->make_idempotent(). Thus do nothing here.
+ return err;
+ }
+
+ CHECK_NOTNULL(new_request,
+ "new_request should not be null since its original write
request must be atomic");
+
+ // During make_idempotent(), the request has been deserialized (i.e.
unmarshall() in the
+ // constructor of `rpc_holder::internal`). Once deserialize it again,
assertion would fail for
+ // set_read_msg() in the constructor of `rpc_read_stream`.
+ //
+ // To make it deserializable again to be applied into RocksDB, restore
read for it.
+ request->restore_read();
+
+ // The decree must have not been assigned.
+ CHECK_EQ(mu->get_decree(), invalid_decree);
+
+ // Create a new mutation to hold the new idempotent request. The old
mutation holding the
+ // original atomic write request will be released automatically.
+ mu = new_mutation(invalid_decree, request);
+ mu->add_client_request(new_request);
+ return rocksdb::Status::kOk;
+}
+
void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool
pop_all_committed_mutations)
{
CHECK_EQ(partition_status::PS_PRIMARY, status());
+ if (make_idempotent(mu) != rocksdb::Status::kOk) {
+ // If some error occurred, the response with error must have been
returned to the
+ // client during make_idempotent(). Thus do nothing here.
+ return;
+ }
+
mu->_tracer->set_description("primary");
ADD_POINT(mu->_tracer);
@@ -243,9 +337,10 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
mu->set_id(get_ballot(), _prepare_list->max_decree() + 1);
// print a debug log if necessary
if (FLAGS_prepare_decree_gap_for_debug_logging > 0 &&
- mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging == 0)
+ mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging ==
0) {
level = LOG_LEVEL_INFO;
- mu->set_timestamp(_uniq_timestamp_us.next());
+ }
+ mu->set_timestamp(static_cast<int64_t>(_uniq_timestamp_us.next()));
} else {
mu->set_id(get_ballot(), mu->data.header.decree);
}
@@ -259,7 +354,8 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
// check bounded staleness
if (mu->data.header.decree > last_committed_decree() +
FLAGS_staleness_for_commit) {
err = ERR_CAPACITY_EXCEEDED;
- goto ErrOut;
+ reply_with_error(mu, err);
+ return;
}
// stop prepare bulk load ingestion if there are secondaries unalive
@@ -276,7 +372,8 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
}
}
if (err != ERR_OK) {
- goto ErrOut;
+ reply_with_error(mu, err);
+ return;
}
// stop prepare if there are too few replicas unless it's a reconciliation
@@ -286,7 +383,8 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) &&
!reconciliation) {
err = ERR_NOT_ENOUGH_MEMBER;
- goto ErrOut;
+ reply_with_error(mu, err);
+ return;
}
CHECK_GT(mu->data.header.decree, last_committed_decree());
@@ -294,7 +392,8 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
// local prepare
err = _prepare_list->prepare(mu, partition_status::PS_PRIMARY,
pop_all_committed_mutations);
if (err != ERR_OK) {
- goto ErrOut;
+ reply_with_error(mu, err);
+ return;
}
// remote prepare
@@ -306,7 +405,8 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
partition_status::PS_SECONDARY,
mu,
FLAGS_prepare_timeout_ms_for_secondaries,
- pop_all_committed_mutations);
+ pop_all_committed_mutations,
+ invalid_signature);
}
count = 0;
@@ -348,16 +448,23 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
}
_primary_states.last_prepare_ts_ms = mu->prepare_ts_ms();
- return;
+}
+
+void replica::reply_with_error(const mutation_ptr &mu, const error_code &err)
+{
+ // Respond to the original atomic request if it is non-null. And it could
never be batched.
+ if (mu->original_request != nullptr) {
+ response_client_write(mu->original_request, err);
+ return;
+ }
-ErrOut:
- for (auto &r : mu->client_requests) {
- response_client_write(r, err);
+ // Just respond to each client request directly if there is no original
request for them.
+ for (auto *req : mu->client_requests) {
+ response_client_write(req, err);
}
- return;
}
-void replica::send_prepare_message(const ::dsn::host_port &hp,
+void replica::send_prepare_message(const host_port &hp,
partition_status::type status,
const mutation_ptr &mu,
int timeout_milliseconds,
diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h
index 79583c13f..3282ed220 100644
--- a/src/replica/replica_base.h
+++ b/src/replica/replica_base.h
@@ -46,15 +46,17 @@ struct replica_base
{
}
- gpid get_gpid() const { return _gpid; }
+ virtual ~replica_base() = default;
- const char *replica_name() const { return _name.c_str(); }
+ [[nodiscard]] gpid get_gpid() const { return _gpid; }
- const char *app_name() const { return _app_name.c_str(); }
+ [[nodiscard]] const char *replica_name() const { return _name.c_str(); }
- const char *log_prefix() const { return _name.c_str(); }
+ [[nodiscard]] const char *app_name() const { return _app_name.c_str(); }
- const metric_entity_ptr &replica_metric_entity() const
+ [[nodiscard]] const char *log_prefix() const { return _name.c_str(); }
+
+ [[nodiscard]] const metric_entity_ptr &replica_metric_entity() const
{
CHECK_NOTNULL(_replica_metric_entity,
"replica metric entity (table_id={}, partition_id={})
should has been "
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index c0e13aca5..c4f4c01e1 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -171,7 +171,7 @@ void replica::broadcast_group_check()
if (!FLAGS_empty_write_disabled &&
dsn_now_ms() >= _primary_states.last_prepare_ts_ms +
FLAGS_group_check_interval_ms) {
mutation_ptr mu = new_mutation(invalid_decree);
- mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+ mu->add_client_request(nullptr);
init_prepare(mu, false);
}
}
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index b975626b6..6ef6f451c 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -227,7 +227,7 @@ void
replica::async_trigger_manual_emergency_checkpoint(decree min_checkpoint_de
// the decree to at least 1, to ensure that the checkpoint
would inevitably
// be created even if the replica is empty.
mutation_ptr mu = new_mutation(invalid_decree);
- mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+ mu->add_client_request(nullptr);
init_prepare(mu, false);
async_trigger_manual_emergency_checkpoint(
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index 8fc73eec7..75e31caf7 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -1039,9 +1039,9 @@ bool replica::update_local_configuration(const
replica_configuration &config,
// start pending mutations if necessary
if (status() == partition_status::PS_PRIMARY) {
- mutation_ptr next = _primary_states.write_queue.check_possible_work(
- static_cast<int>(_prepare_list->max_decree() -
last_committed_decree()));
- if (next) {
+ auto next = _primary_states.write_queue.next_work(
+ static_cast<int>(max_prepared_decree() - last_committed_decree()));
+ if (next != nullptr) {
init_prepare(next, false);
}
@@ -1175,7 +1175,7 @@ void replica::replay_prepare_list()
"copy mutation from mutation_tid={} to mutation_tid={}",
old->tid(), mu->tid());
mu->copy_from(old);
} else {
- mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+ mu->add_client_request(nullptr);
LOG_INFO_PREFIX("emit empty mutation {} with mutation_tid={} when
replay prepare list",
mu->name(),
diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h
index c1719c8de..e40cf83dd 100644
--- a/src/replica/replica_context.h
+++ b/src/replica/replica_context.h
@@ -89,9 +89,9 @@ typedef std::unordered_map<::dsn::host_port,
remote_learner_state> learner_map;
class primary_context
{
public:
- primary_context(gpid gpid, int max_concurrent_2pc_count = 1, bool
batch_write_disabled = false)
+ primary_context(replica *r, gpid gpid, int max_concurrent_2pc_count, bool
batch_write_disabled)
: next_learning_version(0),
- write_queue(gpid, max_concurrent_2pc_count, batch_write_disabled),
+ write_queue(r, gpid, max_concurrent_2pc_count, batch_write_disabled),
last_prepare_decree_on_new_primary(0),
last_prepare_ts_ms(dsn_now_ms())
{
@@ -118,7 +118,6 @@ public:
bool secondary_disk_abnormal() const;
-public:
// membership mgr, including learners
partition_configuration pc;
node_statuses statuses;
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index 0e2c9422b..56f4c00e2 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -719,7 +719,7 @@ void replica_split_manager::parent_handle_child_catch_up(
if (!FLAGS_empty_write_disabled) {
// empty wirte here to commit sync_point
mutation_ptr mu = _replica->new_mutation(invalid_decree);
- mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+ mu->add_client_request(nullptr);
_replica->init_prepare(mu, false);
CHECK_EQ_PREFIX_MSG(
sync_point, mu->data.header.decree, "sync_point should be equal to
mutation's decree");
diff --git a/src/replica/split/test/replica_split_test.cpp
b/src/replica/split/test/replica_split_test.cpp
index 2f18fe751..fc1292131 100644
--- a/src/replica/split/test/replica_split_test.cpp
+++ b/src/replica/split/test/replica_split_test.cpp
@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-#include <stdint.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
#include <memory>
#include <string>
#include <unordered_set>
@@ -482,46 +483,50 @@ public:
_child_replica->tracker()->wait_outstanding_tasks();
}
- int32_t child_get_prepare_list_count() { return
_child_replica->get_plist()->count(); }
- bool child_is_prepare_list_copied()
+ [[nodiscard]] int32_t child_get_prepare_list_count() const
+ {
+ return _child_replica->get_plist()->count();
+ }
+ [[nodiscard]] bool child_is_prepare_list_copied() const
{
return _child_replica->_split_states.is_prepare_list_copied;
}
- bool child_is_caught_up() { return
_child_replica->_split_states.is_caught_up; }
-
- split_status::type parent_get_split_status() { return
_parent_split_mgr->_split_status; }
- void parent_set_split_status(split_status::type status)
+ [[nodiscard]] bool child_is_caught_up() const
{
- _parent_split_mgr->_split_status = status;
+ return _child_replica->_split_states.is_caught_up;
}
- primary_context get_replica_primary_context(mock_replica_ptr rep)
+ [[nodiscard]] split_status::type parent_get_split_status() const
{
- return rep->_primary_states;
+ return _parent_split_mgr->_split_status;
}
- bool parent_sync_send_write_request()
+ void parent_set_split_status(split_status::type status) const
+ {
+ _parent_split_mgr->_split_status = status;
+ }
+
+ [[nodiscard]] bool parent_sync_send_write_request() const
{
return _parent_replica->_primary_states.sync_send_write_request;
}
- int32_t parent_stopped_split_size()
+ [[nodiscard]] size_t parent_stopped_split_size() const
{
return _parent_replica->_primary_states.split_stopped_secondary.size();
}
- bool is_parent_not_in_split()
+ [[nodiscard]] bool is_parent_not_in_split() const
{
return _parent_split_mgr->_child_gpid.get_app_id() == 0 &&
_parent_split_mgr->_child_init_ballot == 0 &&
_parent_split_mgr->_split_status == split_status::NOT_SPLIT;
}
- bool primary_parent_not_in_split()
+ [[nodiscard]] bool primary_parent_not_in_split() const
{
- auto context = _parent_replica->_primary_states;
- return context.caught_up_children.size() == 0 &&
context.register_child_task == nullptr &&
- context.sync_send_write_request == false &&
context.query_child_task == nullptr &&
- context.split_stopped_secondary.size() == 0 &&
is_parent_not_in_split();
+ const auto &context = _parent_replica->_primary_states;
+ return context.caught_up_children.empty() &&
context.register_child_task == nullptr &&
+ !context.sync_send_write_request && context.query_child_task ==
nullptr &&
+ context.split_stopped_secondary.empty() &&
is_parent_not_in_split();
}
-public:
const std::string APP_NAME = "split_table";
const int32_t APP_ID = 2;
const int32_t OLD_PARTITION_COUNT = 8;
@@ -978,7 +983,7 @@ TEST_P(replica_split_test, primary_parent_handle_stop_test)
split_status::type meta_split_status;
bool lack_of_secondary;
bool will_all_stop;
- int32_t expected_size;
+ size_t expected_size;
bool expected_all_stopped;
} tests[]{{split_status::NOT_SPLIT, false, false, 0, false},
{split_status::SPLITTING, false, false, 0, false},
diff --git a/src/replica/test/cold_backup_context_test.cpp
b/src/replica/test/cold_backup_context_test.cpp
index 529ac0a09..8d7478bfc 100644
--- a/src/replica/test/cold_backup_context_test.cpp
+++ b/src/replica/test/cold_backup_context_test.cpp
@@ -35,7 +35,6 @@
#include "gtest/gtest.h"
#include "metadata_types.h"
#include "replica/backup/cold_backup_context.h"
-#include "replica/replica.h"
#include "replica/test/replication_service_test_app.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
diff --git a/src/runtime/serverlet.h b/src/runtime/serverlet.h
index e5a7df0b3..03541ae07 100644
--- a/src/runtime/serverlet.h
+++ b/src/runtime/serverlet.h
@@ -108,7 +108,7 @@ class serverlet
{
public:
explicit serverlet(const char *nm);
- virtual ~serverlet();
+ virtual ~serverlet() = default;
protected:
template <typename TRequest>
@@ -153,11 +153,6 @@ inline serverlet<T>::serverlet(const char *nm) : _name(nm)
{
}
-template <typename T>
-inline serverlet<T>::~serverlet()
-{
-}
-
template <typename T>
template <typename TRequest>
inline bool serverlet<T>::register_rpc_handler(task_code rpc_code,
diff --git a/src/task/task_spec.cpp b/src/task/task_spec.cpp
index ff38b1fbd..1503e52e2 100644
--- a/src/task/task_spec.cpp
+++ b/src/task/task_spec.cpp
@@ -59,10 +59,10 @@ void task_spec::register_task_code(task_code code,
{
CHECK_GE(code, 0);
CHECK_LT(code, TASK_SPEC_STORE_CAPACITY);
- if (!s_task_spec_store[code]) {
- s_task_spec_store[code] =
+ if (s_task_spec_store.at(code) == nullptr) {
+ s_task_spec_store.at(code) =
std::make_unique<task_spec>(code, code.to_string(), type, pri,
pool);
- auto &spec = s_task_spec_store[code];
+ auto &spec = s_task_spec_store.at(code);
if (type == TASK_TYPE_RPC_REQUEST) {
std::string ack_name = std::string(code.to_string()) +
std::string("_ACK");
@@ -75,33 +75,35 @@ void task_spec::register_task_code(task_code code,
spec->rpc_paired_code = ack_code;
task_spec::get(ack_code.code())->rpc_paired_code = code;
}
- } else {
- auto spec = task_spec::get(code);
- CHECK_EQ_MSG(
- spec->type,
- type,
- "task code {} registerd for {}, which does not match with
previously registered {}",
- code,
- enum_to_string(type),
- enum_to_string(spec->type));
-
- if (spec->priority != pri) {
- LOG_WARNING("overwrite priority for task {} from {} to {}",
- code,
- enum_to_string(spec->priority),
- enum_to_string(pri));
- spec->priority = pri;
- }
- if (spec->pool_code != pool) {
- if (spec->pool_code != THREAD_POOL_INVALID) {
- LOG_WARNING("overwrite default thread pool for task {} from {}
to {}",
- code,
- spec->pool_code,
- pool);
- }
- spec->pool_code = pool;
+ return;
+ }
+
+ auto *spec = task_spec::get(code);
+ CHECK_EQ_MSG(
+ spec->type,
+ type,
+ "task code {} registerd for {}, which does not match with previously
registered {}",
+ code,
+ enum_to_string(type),
+ enum_to_string(spec->type));
+
+ if (spec->priority != pri) {
+ LOG_WARNING("overwrite priority for task {} from {} to {}",
+ code,
+ enum_to_string(spec->priority),
+ enum_to_string(pri));
+ spec->priority = pri;
+ }
+
+ if (spec->pool_code != pool) {
+ if (spec->pool_code != THREAD_POOL_INVALID) {
+ LOG_WARNING("overwrite default thread pool for task {} from {} to
{}",
+ code,
+ spec->pool_code,
+ pool);
}
+ spec->pool_code = pool;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]