This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 7e95f31cc feat(remote_command): provide the query for the progress of
decrees including both local writes and duplications (#2045)
7e95f31cc is described below
commit 7e95f31cc15e11574a42803dbf73b25599fae00b
Author: Dan Wang <[email protected]>
AuthorDate: Fri Jun 21 15:42:43 2024 +0800
feat(remote_command): provide the query for the progress of decrees
including both local writes and duplications (#2045)
There are many kinds of decrees while writing locally and duplicating to
remote
clusters, for example, the max decree in prepare list, the last decree that
has ever
been committed, the last decree that has been applied into rocksdb memtable,
the last decree that has been flushed into rocksdb sst files, the max
decree that
has been confirmed by remote cluster for duplication, etc..
These decrees are very useful while we want to watch the progress of all
the local
writes and duplications. These decrees might also help us diagnose the
problems.
Therefore, we provide a tool in the way of `remote_command` to show the
decrees
for each replica.
---
src/common/json_helper.h | 6 ++
src/replica/duplication/replica_duplicator.h | 20 ++++++
.../duplication/replica_duplicator_manager.cpp | 75 ++++++++++++++-----
.../duplication/replica_duplicator_manager.h | 48 ++++++-------
src/replica/replica.cpp | 6 +-
src/replica/replica.h | 39 +++++++---
src/replica/replica_stub.cpp | 20 +++++-
src/replica/replication_app_base.h | 6 ++
.../storage/simple_kv/simple_kv.server.impl.h | 4 +-
.../storage/simple_kv/test/simple_kv.server.impl.h | 4 +-
src/replica/test/mock_utils.h | 2 +
src/server/pegasus_server_impl.cpp | 83 +++++++++++++---------
src/server/pegasus_server_impl.h | 2 +
13 files changed, 225 insertions(+), 90 deletions(-)
diff --git a/src/common/json_helper.h b/src/common/json_helper.h
index 4329bd2a8..cbd94af94 100644
--- a/src/common/json_helper.h
+++ b/src/common/json_helper.h
@@ -237,6 +237,12 @@
JSON_DECODE_ENTRIES(input, t, __VA_ARGS__);
\
}
+#define JSON_ENCODE_OBJ(writer, name, ...)
\
+ do {
\
+ writer.Key(#name);
\
+ dsn::json::json_encode(writer, __VA_ARGS__);
\
+ } while (0)
+
namespace dsn {
namespace json {
diff --git a/src/replica/duplication/replica_duplicator.h
b/src/replica/duplication/replica_duplicator.h
index ebf4473b9..e9df7d7cb 100644
--- a/src/replica/duplication/replica_duplicator.h
+++ b/src/replica/duplication/replica_duplicator.h
@@ -23,6 +23,7 @@
#include <string>
#include "common//duplication_common.h"
+#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "duplication_types.h"
#include "replica/replica_base.h"
@@ -143,6 +144,25 @@ public:
void set_duplication_plog_checking(bool checking);
+ // Encode current progress of this duplication into json.
+ template <typename TWriter>
+ void encode_progress(TWriter &writer) const
+ {
+ writer.StartObject();
+
+ JSON_ENCODE_OBJ(writer, dupid, _id);
+ JSON_ENCODE_OBJ(writer, remote_cluster_name, _remote_cluster_name);
+ JSON_ENCODE_OBJ(writer, remote_app_name, _remote_app_name);
+
+ {
+ zauto_read_lock l(_lock);
+ JSON_ENCODE_OBJ(writer, confirmed_decree, _progress.last_decree);
+ JSON_ENCODE_OBJ(writer, persisted_decree,
_progress.confirmed_decree);
+ }
+
+ writer.EndObject();
+ }
+
private:
friend class duplication_test_base;
friend class replica_duplicator_test;
diff --git a/src/replica/duplication/replica_duplicator_manager.cpp
b/src/replica/duplication/replica_duplicator_manager.cpp
index d60bf57b2..9d2153559 100644
--- a/src/replica/duplication/replica_duplicator_manager.cpp
+++ b/src/replica/duplication/replica_duplicator_manager.cpp
@@ -22,7 +22,11 @@
#include "common//duplication_common.h"
#include "common/gpid.h"
+#include "common/replication_enums.h"
+#include "metadata_types.h"
#include "replica/duplication/replica_duplicator.h"
+#include "replica/duplication/replica_duplicator_manager.h"
+#include "replica/replica.h"
#include "replica_duplicator_manager.h"
#include "utils/autoref_ptr.h"
#include "utils/errors.h"
@@ -41,29 +45,56 @@
replica_duplicator_manager::replica_duplicator_manager(replica *r)
{
}
+void replica_duplicator_manager::update_duplication_map(
+ const std::map<int32_t, duplication_entry> &new_dup_map)
+{
+ if (new_dup_map.empty() || _replica->status() !=
partition_status::PS_PRIMARY) {
+ remove_all_duplications();
+ return;
+ }
+
+ remove_non_existed_duplications(new_dup_map);
+
+ for (const auto &kv2 : new_dup_map) {
+ sync_duplication(kv2.second);
+ }
+}
+
std::vector<duplication_confirm_entry>
replica_duplicator_manager::get_duplication_confirms_to_update() const
{
zauto_lock l(_lock);
std::vector<duplication_confirm_entry> updates;
- for (const auto &kv : _duplications) {
- replica_duplicator *duplicator = kv.second.get();
- duplication_progress p = duplicator->progress();
- if (p.last_decree != p.confirmed_decree ||
- (kv.second->status() == duplication_status::DS_PREPARE &&
p.checkpoint_has_prepared)) {
- if (p.last_decree < p.confirmed_decree) {
- LOG_ERROR_PREFIX("invalid decree state: p.last_decree({}) <
p.confirmed_decree({})",
- p.last_decree,
- p.confirmed_decree);
- continue;
- }
- duplication_confirm_entry entry;
- entry.dupid = duplicator->id();
- entry.confirmed_decree = p.last_decree;
- entry.__set_checkpoint_prepared(p.checkpoint_has_prepared);
- updates.emplace_back(entry);
+ for (const auto & [ _, dup ] : _duplications) {
+ // There are two conditions when we should send confirmed decrees to
meta server to update
+ // the progress:
+ //
+ // 1. the acknowledged decree from remote cluster has changed, making
it different from
+ // the one that is persisted in zk by meta server; otherwise,
+ //
+ // 2. the duplication has been in the stage of synchronizing
checkpoint to the remote
+ // cluster, and the synchronized checkpoint has been ready.
+ const auto &progress = dup->progress();
+ if (progress.last_decree == progress.confirmed_decree &&
+ (dup->status() != duplication_status::DS_PREPARE ||
+ !progress.checkpoint_has_prepared)) {
+ continue;
}
+
+ if (progress.last_decree < progress.confirmed_decree) {
+ LOG_ERROR_PREFIX(
+ "invalid decree state: progress.last_decree({}) <
progress.confirmed_decree({})",
+ progress.last_decree,
+ progress.confirmed_decree);
+ continue;
+ }
+
+ duplication_confirm_entry entry;
+ entry.dupid = dup->id();
+ entry.confirmed_decree = progress.last_decree;
+ entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared);
+ updates.emplace_back(entry);
}
return updates;
}
@@ -191,5 +222,17 @@ replica_duplicator_manager::get_dup_states() const
return ret;
}
+void replica_duplicator_manager::remove_all_duplications()
+{
+ // fast path
+ if (_duplications.empty()) {
+ return;
+ }
+
+ LOG_WARNING_PREFIX("remove all duplication, replica status = {}",
+ enum_to_string(_replica->status()));
+ _duplications.clear();
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/duplication/replica_duplicator_manager.h
b/src/replica/duplication/replica_duplicator_manager.h
index 51bcbd1e1..413176a16 100644
--- a/src/replica/duplication/replica_duplicator_manager.h
+++ b/src/replica/duplication/replica_duplicator_manager.h
@@ -24,19 +24,16 @@
#include <vector>
#include "common//duplication_common.h"
-#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "duplication_types.h"
-#include "metadata_types.h"
-#include "replica/replica.h"
#include "replica/replica_base.h"
#include "replica_duplicator.h"
-#include "utils/fmt_logging.h"
#include "utils/metrics.h"
#include "utils/zlocks.h"
namespace dsn {
namespace replication {
+class replica;
/// replica_duplicator_manager manages the set of duplications on this replica.
/// \see duplication_sync_timer
@@ -51,19 +48,7 @@ public:
// - replica is not primary on replica-server perspective (status !=
PRIMARY)
// - replica is not primary on meta-server perspective
(progress.find(partition_id) == end())
// - the app is not assigned with duplication (dup_map.empty())
- void update_duplication_map(const std::map<int32_t, duplication_entry>
&new_dup_map)
- {
- if (new_dup_map.empty() || _replica->status() !=
partition_status::PS_PRIMARY) {
- remove_all_duplications();
- return;
- }
-
- remove_non_existed_duplications(new_dup_map);
-
- for (const auto &kv2 : new_dup_map) {
- sync_duplication(kv2.second);
- }
- }
+ void update_duplication_map(const std::map<int32_t, duplication_entry>
&new_dup_map);
/// collect updated duplication confirm points from this replica.
std::vector<duplication_confirm_entry>
get_duplication_confirms_to_update() const;
@@ -93,21 +78,30 @@ public:
};
std::vector<dup_state> get_dup_states() const;
+ // Encode current progress of all duplication into json.
+ template <typename TWriter>
+ void encode_progress(TWriter &writer) const
+ {
+ zauto_lock l(_lock);
+
+ if (_duplications.empty()) {
+ return;
+ }
+
+ writer.Key("duplications");
+ writer.StartArray();
+ for (const auto & [ _, dup ] : _duplications) {
+ dup->encode_progress(writer);
+ }
+ writer.EndArray();
+ }
+
private:
void sync_duplication(const duplication_entry &ent);
void remove_non_existed_duplications(const std::map<dupid_t,
duplication_entry> &);
- void remove_all_duplications()
- {
- // fast path
- if (_duplications.empty())
- return;
-
- LOG_WARNING_PREFIX("remove all duplication, replica status = {}",
- enum_to_string(_replica->status()));
- _duplications.clear();
- }
+ void remove_all_duplications();
private:
friend class duplication_sync_timer_test;
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 5bb1f17b8..be31df14f 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -41,10 +41,10 @@
#include "common/replication_common.h"
#include "common/replication_enums.h"
#include "consensus_types.h"
-#include "duplication/replica_duplicator_manager.h"
#include "duplication/replica_follower.h"
#include "mutation.h"
#include "mutation_log.h"
+#include "replica/duplication/replica_duplicator_manager.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
@@ -578,6 +578,10 @@ mutation_ptr replica::new_mutation(decree decree)
return mu;
}
+decree replica::last_applied_decree() const { return
_app->last_committed_decree(); }
+
+decree replica::last_flushed_decree() const { return
_app->last_flushed_decree(); }
+
decree replica::last_durable_decree() const { return
_app->last_durable_decree(); }
decree replica::last_prepared_decree() const
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 3b90641cd..ae0118dc0 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -35,8 +35,10 @@
#include <string>
#include <utility>
+#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
+#include "duplication/replica_duplicator_manager.h" // IWYU pragma: keep
#include "meta_admin_types.h"
#include "metadata_types.h"
#include "mutation.h"
@@ -96,7 +98,6 @@ class replica;
class replica_backup_manager;
class replica_bulk_loader;
class replica_disk_migrator;
-class replica_duplicator_manager;
class replica_follower;
class replica_split_manager;
class replica_stub;
@@ -223,8 +224,37 @@ public:
const app_info *get_app_info() const { return &_app_info; }
decree max_prepared_decree() const { return _prepare_list->max_decree(); }
decree last_committed_decree() const { return
_prepare_list->last_committed_decree(); }
+
+ // The last decree that has been applied into rocksdb memtable.
+ decree last_applied_decree() const;
+
+ // The last decree that has been flushed into rocksdb sst.
+ decree last_flushed_decree() const;
+
decree last_prepared_decree() const;
decree last_durable_decree() const;
+
+ // Encode current progress of decrees into json, including both local
writes and duplications
+ // of this replica.
+ template <typename TWriter>
+ void encode_progress(TWriter &writer) const
+ {
+ writer.StartObject();
+
+ JSON_ENCODE_OBJ(writer, max_prepared_decree, max_prepared_decree());
+ JSON_ENCODE_OBJ(writer, max_plog_decree,
_private_log->max_decree(get_gpid()));
+ JSON_ENCODE_OBJ(writer, max_plog_commit_on_disk,
_private_log->max_commit_on_disk());
+ JSON_ENCODE_OBJ(writer, last_committed_decree,
last_committed_decree());
+ JSON_ENCODE_OBJ(writer, last_applied_decree, last_applied_decree());
+ JSON_ENCODE_OBJ(writer, last_flushed_decree, last_flushed_decree());
+ JSON_ENCODE_OBJ(writer, last_durable_decree, last_durable_decree());
+ JSON_ENCODE_OBJ(writer, max_gc_decree,
_private_log->max_gced_decree(get_gpid()));
+
+ _duplication_mgr->encode_progress(writer);
+
+ writer.EndObject();
+ }
+
const std::string &dir() const { return _dir; }
uint64_t create_time_milliseconds() const { return _create_time_ms; }
const char *name() const { return replica_name(); }
@@ -429,13 +459,6 @@ private:
error_code background_sync_checkpoint();
void catch_up_with_private_logs(partition_status::type s);
void on_checkpoint_completed(error_code err);
- void on_copy_checkpoint_ack(error_code err,
- const std::shared_ptr<replica_configuration>
&req,
- const std::shared_ptr<learn_response> &resp);
- void on_copy_checkpoint_file_completed(error_code err,
- size_t sz,
- std::shared_ptr<learn_response>
resp,
- const std::string &chk_dir);
// Enable/Disable plog garbage collection to be executed. For example, to
duplicate data
// to target cluster, we could firstly disable plog garbage collection,
then do copy_data.
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 3558fb7a4..a12c22efb 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -29,6 +29,7 @@
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
#include <fmt/format.h>
+#include <rapidjson/ostreamwrapper.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
@@ -37,8 +38,8 @@
#include <deque>
#include <iterator>
#include <mutex>
-#include <ostream>
#include <set>
+#include <sstream>
#include <type_traits>
#include <vector>
@@ -47,6 +48,7 @@
#include "bulk_load/replica_bulk_loader.h"
#include "common/backup_common.h"
#include "common/duplication_common.h"
+#include "common/json_helper.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
#include "disk_cleaner.h"
@@ -2335,6 +2337,22 @@ void replica_stub::register_ctrl_command()
});
}));
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.query-progress",
+ "Query the progress of decrees, including both local writes and
duplications for "
+ "replicas specified by comma-separated list of 'app_id' or
'app_id.partition_id', "
+ "or all replicas for empty",
+ "[id1,id2,...]",
+ [this](const std::vector<std::string> &args) {
+ return exec_command_on_replica(args, true, [](const
replica_ptr &rep) {
+ std::ostringstream out;
+ rapidjson::OStreamWrapper wrapper(out);
+ dsn::json::PrettyJsonWriter writer(wrapper);
+ rep->encode_progress(writer);
+ return out.str();
+ });
+ }));
+
#ifdef DSN_ENABLE_GPERF
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
_release_tcmalloc_memory,
diff --git a/src/replica/replication_app_base.h
b/src/replica/replication_app_base.h
index c3559c095..2a88618f6 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -238,7 +238,13 @@ public:
//
// Query methods.
//
+
+ // Get the decree of the last flushed mutation. -1 means failed to get.
+ virtual replication::decree last_flushed_decree() const = 0;
+
+ // Get the decree of the last created checkpoint.
virtual replication::decree last_durable_decree() const = 0;
+
// The return type is generated by storage engine, e.g.
rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;
diff --git a/src/replica/storage/simple_kv/simple_kv.server.impl.h
b/src/replica/storage/simple_kv/simple_kv.server.impl.h
index 240ed8789..b296c1f83 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.impl.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.impl.h
@@ -70,7 +70,9 @@ public:
virtual ::dsn::error_code stop(bool cleanup = false) override;
- virtual int64_t last_durable_decree() const override { return
_last_durable_decree; }
+ int64_t last_flushed_decree() const override { return
_last_durable_decree; }
+
+ int64_t last_durable_decree() const override { return
_last_durable_decree; }
virtual ::dsn::error_code sync_checkpoint() override;
diff --git a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h
b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h
index 1235cdbc6..8b80396a0 100644
--- a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h
+++ b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h
@@ -82,7 +82,9 @@ public:
virtual ::dsn::error_code stop(bool cleanup = false) override;
- virtual int64_t last_durable_decree() const override { return
_last_durable_decree; }
+ int64_t last_flushed_decree() const override { return
_last_durable_decree; }
+
+ int64_t last_durable_decree() const override { return
_last_durable_decree; }
virtual ::dsn::error_code sync_checkpoint() override;
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 6d7725b78..cc631143b 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -83,6 +83,8 @@ public:
// we mock the followings
void update_app_envs(const std::map<std::string, std::string> &envs)
override { _envs = envs; }
void query_app_envs(std::map<std::string, std::string> &out) override {
out = _envs; }
+
+ decree last_flushed_decree() const override { return _last_durable_decree;
}
decree last_durable_decree() const override { return _last_durable_decree;
}
// TODO(heyuchen): implement this function in further pull request
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 35419d8ef..75c95a812 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2168,47 +2168,49 @@ private:
}
LOG_INFO_PREFIX("copy checkpoint to dir({}) succeed", checkpoint_dir);
- if (checkpoint_decree != nullptr) {
- rocksdb::DB *snapshot_db = nullptr;
- std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
- auto cleanup = [&](bool remove_checkpoint) {
- if (remove_checkpoint &&
!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
- LOG_ERROR_PREFIX("remove checkpoint directory {} failed",
checkpoint_dir);
- }
- if (snapshot_db) {
- for (auto handle : handles_opened) {
- if (handle) {
- snapshot_db->DestroyColumnFamilyHandle(handle);
- handle = nullptr;
- }
+ if (checkpoint_decree == nullptr) {
+ return ::dsn::ERR_OK;
+ }
+
+ rocksdb::DB *snapshot_db = nullptr;
+ std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
+ auto cleanup = [&](bool remove_checkpoint) {
+ if (remove_checkpoint &&
!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
+ LOG_ERROR_PREFIX("remove checkpoint directory {} failed",
checkpoint_dir);
+ }
+ if (snapshot_db) {
+ for (auto handle : handles_opened) {
+ if (handle) {
+ snapshot_db->DestroyColumnFamilyHandle(handle);
+ handle = nullptr;
}
- delete snapshot_db;
- snapshot_db = nullptr;
}
- };
-
- // Because of RocksDB's restriction, we have to to open default column
family even though
- // not use it
- std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
- {{meta_store::DATA_COLUMN_FAMILY_NAME,
rocksdb::ColumnFamilyOptions()},
- {meta_store::META_COLUMN_FAMILY_NAME,
rocksdb::ColumnFamilyOptions()}});
- status = rocksdb::DB::OpenForReadOnly(
- _db_opts, checkpoint_dir, column_families, &handles_opened,
&snapshot_db);
- if (!status.ok()) {
- LOG_ERROR_PREFIX(
- "OpenForReadOnly from {} failed, error = {}", checkpoint_dir,
status.ToString());
+ delete snapshot_db;
snapshot_db = nullptr;
- cleanup(true);
- return ::dsn::ERR_LOCAL_APP_FAILURE;
}
- CHECK_EQ_PREFIX(handles_opened.size(), 2);
- CHECK_EQ_PREFIX(handles_opened[1]->GetName(),
meta_store::META_COLUMN_FAMILY_NAME);
- uint64_t last_flushed_decree =
- _meta_store->get_decree_from_readonly_db(snapshot_db,
handles_opened[1]);
- *checkpoint_decree = last_flushed_decree;
+ };
- cleanup(false);
+ // Because of RocksDB's restriction, we have to to open default column
family even though
+ // not use it
+ std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
+ {{meta_store::DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()},
+ {meta_store::META_COLUMN_FAMILY_NAME,
rocksdb::ColumnFamilyOptions()}});
+ status = rocksdb::DB::OpenForReadOnly(
+ _db_opts, checkpoint_dir, column_families, &handles_opened,
&snapshot_db);
+ if (!status.ok()) {
+ LOG_ERROR_PREFIX(
+ "OpenForReadOnly from {} failed, error = {}", checkpoint_dir,
status.ToString());
+ snapshot_db = nullptr;
+ cleanup(true);
+ return ::dsn::ERR_LOCAL_APP_FAILURE;
}
+ CHECK_EQ_PREFIX(handles_opened.size(), 2);
+ CHECK_EQ_PREFIX(handles_opened[1]->GetName(),
meta_store::META_COLUMN_FAMILY_NAME);
+ uint64_t last_flushed_decree =
+ _meta_store->get_decree_from_readonly_db(snapshot_db,
handles_opened[1]);
+ *checkpoint_decree = last_flushed_decree;
+
+ cleanup(false);
return ::dsn::ERR_OK;
}
@@ -2318,6 +2320,17 @@
pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode,
return ::dsn::ERR_OK;
}
+int64_t pegasus_server_impl::last_flushed_decree() const
+{
+ uint64_t decree = 0;
+ const auto &err = _meta_store->get_last_flushed_decree(&decree);
+ if (dsn_unlikely(err != dsn::ERR_OK)) {
+ return -1;
+ }
+
+ return static_cast<int64_t>(decree);
+}
+
bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type
filter_type,
const ::dsn::blob &filter_pattern,
const ::dsn::blob &value)
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 361d9cbba..d902e647d 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -223,6 +223,8 @@ public:
::dsn::error_code storage_apply_checkpoint(chkpt_apply_mode mode,
const
dsn::replication::learn_state &state) override;
+ int64_t last_flushed_decree() const override;
+
int64_t last_durable_decree() const override { return
_last_durable_decree.load(); }
void update_app_envs(const std::map<std::string, std::string> &envs)
override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]