- add create and set (if previous value matches) functions to KeyValue class
- add Consensus::MonitorTakeoverRequest() function for use by RDE to answer
takeover requests
- add Consensus::CreateTakeoverRequest() - before a SC is promoted to active,
it will
create a takeover request in the KV store. An existing SC can reject the lock
takeover
---
src/osaf/consensus/consensus.cc | 435 ++++++++++++++++++++++++++++++++++------
src/osaf/consensus/consensus.h | 55 ++++-
src/osaf/consensus/key_value.cc | 105 +++++++---
src/osaf/consensus/key_value.h | 19 +-
4 files changed, 511 insertions(+), 103 deletions(-)
diff --git a/src/osaf/consensus/consensus.cc b/src/osaf/consensus/consensus.cc
index cc04f3518..f2245dd01 100644
--- a/src/osaf/consensus/consensus.cc
+++ b/src/osaf/consensus/consensus.cc
@@ -15,13 +15,17 @@
#include "osaf/consensus/consensus.h"
#include <unistd.h>
#include <climits>
+#include <sstream>
#include <thread>
#include "base/conf.h"
#include "base/getenv.h"
#include "base/logtrace.h"
#include "base/ncssysf_def.h"
-SaAisErrorT Consensus::PromoteThisNode() {
+const std::string Consensus::kTakeoverRequestKeyname = "takeover_request";
+
+SaAisErrorT Consensus::PromoteThisNode(const bool graceful_takeover,
+ const uint64_t cluster_size) {
TRACE_ENTER();
SaAisErrorT rc;
@@ -29,6 +33,10 @@ SaAisErrorT Consensus::PromoteThisNode() {
return SA_AIS_OK;
}
+ // check if there is an existing takeover requests, we cannot
+ // attempt to lock until that is complete
+ CheckForExistingTakeoverRequest();
+
uint32_t retries = 0;
rc = KeyValue::Lock(base::Conf::NodeName(), kLockTimeout);
while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) {
@@ -39,33 +47,30 @@ SaAisErrorT Consensus::PromoteThisNode() {
}
if (rc == SA_AIS_ERR_EXIST) {
+ // there's a chance the lock has been released since the lock attempt
// get the current active controller
- std::string current_active("");
- retries = 0;
- rc = KeyValue::LockOwner(current_active);
- while (rc != SA_AIS_OK && retries < kMaxRetry) {
- ++retries;
- std::this_thread::sleep_for(kSleepInterval);
- rc = KeyValue::LockOwner(current_active);
- }
- if (rc != SA_AIS_OK) {
- LOG_ER("Failed to get current lock owner. Will attempt to lock anyway");
+ std::string current_active = CurrentActive();
+
+ if (current_active.empty() == true) {
+ LOG_WA("Failed to get current lock owner. Will attempt to lock anyway");
}
+ bool take_over_request_created = false;
LOG_NO("Current active controller is %s", current_active.c_str());
- // there's a chance the lock has been released since the lock attempt
if (current_active.empty() == false) {
- // remove current active controller's lock and fence it
- retries = 0;
- rc = KeyValue::Unlock(current_active);
- while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) {
- LOG_IN("Trying to unlock");
- ++retries;
- std::this_thread::sleep_for(kSleepInterval);
- rc = KeyValue::Unlock(current_active);
+ if (graceful_takeover == true) {
+ rc = CreateTakeoverRequest(current_active, base::Conf::NodeName(),
+ cluster_size);
+ if (rc != SA_AIS_OK) {
+ LOG_WA("Takeover request failed (%d)", rc);
+ return rc;
+ }
+ take_over_request_created = true;
}
+ // remove current active controller's lock and fence it
+ rc = Demote(current_active);
if (rc == SA_AIS_OK) {
FenceNode(current_active);
} else {
@@ -82,6 +87,23 @@ SaAisErrorT Consensus::PromoteThisNode() {
std::this_thread::sleep_for(kSleepInterval);
rc = KeyValue::Lock(base::Conf::NodeName(), kLockTimeout);
}
+
+ if (take_over_request_created == true) {
+ SaAisErrorT rc1;
+
+ // remove takeover request
+ rc1 = KeyValue::Erase(kTakeoverRequestKeyname);
+ retries = 0;
+ while (rc1 != SA_AIS_OK && retries < kMaxRetry) {
+ ++retries;
+ std::this_thread::sleep_for(kSleepInterval);
+ rc1 = KeyValue::Erase(kTakeoverRequestKeyname);
+ }
+
+ if (rc1 != SA_AIS_OK) {
+ LOG_WA("Could not remove takeover request");
+ }
+ }
}
if (rc == SA_AIS_OK) {
@@ -93,43 +115,23 @@ SaAisErrorT Consensus::PromoteThisNode() {
return rc;
}
-SaAisErrorT Consensus::Demote(const std::string& node = "") {
+SaAisErrorT Consensus::Demote(const std::string& node) {
TRACE_ENTER();
if (use_consensus_ == false) {
return SA_AIS_OK;
}
- SaAisErrorT rc = SA_AIS_ERR_FAILED_OPERATION;
- uint32_t retries = 0;
-
- // check current active node
- std::string current_active;
- rc = KeyValue::LockOwner(current_active);
- while (rc != SA_AIS_OK && retries < kMaxRetry) {
- ++retries;
- std::this_thread::sleep_for(kSleepInterval);
- rc = KeyValue::LockOwner(current_active);
- }
-
- if (rc != SA_AIS_OK) {
- LOG_ER("Failed to get lock owner");
- return rc;
- }
-
- LOG_NO("Demoting %s as active controller", current_active.c_str());
+ osafassert(node.empty() == false);
- if (node.empty() == false && node != current_active) {
- // node is not the current active controller!
- osafassert(false);
- }
+ SaAisErrorT rc;
+ uint32_t retries = 0;
- retries = 0;
- rc = KeyValue::Unlock(current_active);
+ rc = KeyValue::Unlock(node);
while (rc == SA_AIS_ERR_TRY_AGAIN && retries < kMaxRetry) {
LOG_IN("Trying to unlock");
++retries;
std::this_thread::sleep_for(kSleepInterval);
- rc = KeyValue::Unlock(current_active);
+ rc = KeyValue::Unlock(node);
}
if (rc != SA_AIS_OK) {
@@ -143,7 +145,17 @@ SaAisErrorT Consensus::Demote(const std::string& node =
"") {
SaAisErrorT Consensus::DemoteCurrentActive() {
TRACE_ENTER();
- return Demote();
+
+ // check current active node
+ std::string current_active = CurrentActive();
+ if (current_active.empty() == true) {
+ LOG_ER("Failed to get lock owner");
+ return SA_AIS_ERR_FAILED_OPERATION;
+ }
+
+ LOG_NO("Demoting %s as active controller", current_active.c_str());
+
+ return Demote(current_active);
}
SaAisErrorT Consensus::DemoteThisNode() {
@@ -151,9 +163,7 @@ SaAisErrorT Consensus::DemoteThisNode() {
return Demote(base::Conf::NodeName());
}
-bool Consensus::IsEnabled() const {
- return use_consensus_;
-}
+bool Consensus::IsEnabled() const { return use_consensus_; }
bool Consensus::IsWritable() const {
TRACE_ENTER();
@@ -178,9 +188,7 @@ bool Consensus::IsWritable() const {
}
}
-bool Consensus::IsRemoteFencingEnabled() const {
- return use_remote_fencing_;
-}
+bool Consensus::IsRemoteFencingEnabled() const { return use_remote_fencing_; }
std::string Consensus::CurrentActive() const {
TRACE_ENTER();
@@ -188,7 +196,7 @@ std::string Consensus::CurrentActive() const {
return "";
}
- SaAisErrorT rc = SA_AIS_ERR_FAILED_OPERATION;
+ SaAisErrorT rc;
uint32_t retries = 0;
std::string owner;
@@ -212,7 +220,7 @@ Consensus::Consensus() {
uint32_t split_brain_enable = base::GetEnv("FMS_SPLIT_BRAIN_PREVENTION", 0);
std::string kv_store_cmd = base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
- uint32_t use_remote_fencing = base::GetEnv("FMS_USE_REMOTE_FENCING" , 0);
+ uint32_t use_remote_fencing = base::GetEnv("FMS_USE_REMOTE_FENCING", 0);
if (split_brain_enable == 1 && kv_store_cmd.empty() == false) {
use_consensus_ = true;
@@ -228,16 +236,14 @@ Consensus::Consensus() {
base::Conf::InitNodeName();
}
-Consensus::~Consensus() {
-}
+Consensus::~Consensus() {}
bool Consensus::FenceNode(const std::string& node) {
if (use_remote_fencing_ == true) {
LOG_WA("Fencing remote node %s", node.c_str());
// @todo currently passing UINT_MAX as node ID, since
// we can't always obtain a valid node ID?
- opensaf_reboot(UINT_MAX, node.c_str(),
- "Fencing remote node");
+ opensaf_reboot(UINT_MAX, node.c_str(), "Fencing remote node");
return true;
} else {
@@ -247,7 +253,7 @@ bool Consensus::FenceNode(const std::string& node) {
}
void Consensus::MonitorLock(ConsensusCallback callback,
- const uint32_t user_defined) {
+ const uint32_t user_defined) {
TRACE_ENTER();
if (use_consensus_ == false) {
return;
@@ -255,3 +261,312 @@ void Consensus::MonitorLock(ConsensusCallback callback,
KeyValue::WatchLock(callback, user_defined);
}
+
+void Consensus::MonitorTakeoverRequest(ConsensusCallback callback,
+ const uint32_t user_defined) {
+ TRACE_ENTER();
+ if (use_consensus_ == false) {
+ return;
+ }
+
+ KeyValue::Watch(kTakeoverRequestKeyname, callback, user_defined);
+}
+
+void Consensus::CheckForExistingTakeoverRequest() {
+ SaAisErrorT rc;
+ std::vector<std::string> tokens;
+ rc = ReadTakeoverRequest(tokens);
+
+ if (rc != SA_AIS_OK) {
+ return;
+ }
+
+ // get expiration
+ const uint64_t expiration_timestamp = strtoull(
+ tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(),
+ 0, 10);
+
+ // wait until expiration is over
+ int64_t expiration = expiration_timestamp - CurrentTime();
+ if (expiration > 0 && expiration <= kTakeoverValidTime) {
+ // @todo check if the takeover request has been deleted already?
+ LOG_NO("A takeover request is in progress"
+ " (expiring in %" PRId64 " seconds)",
+ expiration);
+ std::chrono::seconds sleep_duration(expiration);
+ std::this_thread::sleep_for(sleep_duration);
+ } else {
+ LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp);
+ }
+}
+
+SaAisErrorT Consensus::CreateTakeoverRequest(const std::string& current_owner,
+ const std::string& proposed_owner,
+ const uint64_t cluster_size) {
+ TRACE_ENTER();
+
+ // Format of takeover request:
+ // "expiration_time<space>current_owner<space>proposed_owner<space>
+ // proposed_owner_cluster_size<space>status"
+ // status := [UNDEFINED, NEW, REJECTED, ACCEPTED]
+
+ std::string takeover_request;
+ // request to expire in 20 seconds
+ uint64_t timestamp = CurrentTime() + kTakeoverValidTime;
+
+ takeover_request =
+ std::to_string(timestamp) + " " + current_owner + " " +
+ base::Conf::NodeName() + " " + std::to_string(cluster_size) + " " +
+ TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)];
+
+ TRACE("Takeover request: \"%s\"", takeover_request.c_str());
+
+ SaAisErrorT rc;
+ uint32_t retries = 0;
+ rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request);
+ while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
+ ++retries;
+ std::this_thread::sleep_for(kSleepInterval);
+ rc = KeyValue::Create(kTakeoverRequestKeyname, takeover_request);
+ }
+
+ if (rc == SA_AIS_ERR_EXIST) {
+ LOG_NO("Existing takeover request found");
+
+ // retrieve takeover request
+ std::vector<std::string> tokens;
+ retries = 0;
+ rc = ReadTakeoverRequest(tokens);
+ while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
+ ++retries;
+ std::this_thread::sleep_for(kSleepInterval);
+ rc = ReadTakeoverRequest(tokens);
+ }
+
+ if (rc == SA_AIS_OK) {
+ // get expiration
+ const uint64_t expiration_timestamp = strtoull(
+ tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(),
+ 0, 10);
+
+ // wait until expiration is over
+ int64_t expiration = expiration_timestamp - CurrentTime();
+ if (expiration > 0 && expiration <= kTakeoverValidTime) {
+ LOG_NO("A takeover request is in progress"
+ " (expiring in %" PRId64 " seconds)",
+ expiration);
+ std::chrono::seconds sleep_duration(expiration);
+ std::this_thread::sleep_for(sleep_duration);
+ } else {
+ LOG_WA("Invalid expiration time (%" PRIu64 ")", expiration_timestamp);
+ }
+ } // else remove it anyway
+
+ LOG_NO("Remove expired takeover request");
+
+ // remove expired request
+ retries = 0;
+ rc = KeyValue::Erase(kTakeoverRequestKeyname);
+ while (rc != SA_AIS_OK && retries < kMaxRetry) {
+ ++retries;
+ std::this_thread::sleep_for(kSleepInterval);
+ rc = KeyValue::Erase(kTakeoverRequestKeyname);
+ }
+
+ if (rc == SA_AIS_OK) {
+ return CreateTakeoverRequest(current_owner, proposed_owner,
cluster_size);
+ } else {
+ LOG_ER("Could not remove existing takeover request");
+ return SA_AIS_ERR_EXIST;
+ }
+ }
+
+ // wait up to 10s for request to be answered
+ retries = 0;
+ while (retries < kMaxTakeoverRetry) {
+ std::vector<std::string> tokens;
+ if (ReadTakeoverRequest(tokens) == SA_AIS_OK) {
+ const std::string state =
+ tokens[static_cast<std::uint8_t>(TakeoverElements::STATE)];
+ const std::string _proposed_owner =
+ tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)];
+
+ if (proposed_owner != _proposed_owner) {
+ LOG_ER("Takeover request was not created by us! (%s)",
+ _proposed_owner.c_str());
+ rc = SA_AIS_ERR_EXIST;
+ break;
+ }
+
+ if (state == TakeoverStateStr[static_cast<std::uint8_t>(
+ TakeoverState::REJECTED)]) {
+ LOG_NO("Takeover request rejected");
+ rc = SA_AIS_ERR_EXIST;
+ break;
+ } else if (state == TakeoverStateStr[static_cast<std::uint8_t>(
+ TakeoverState::ACCEPTED)]) {
+ LOG_NO("Takeover request accepted");
+ rc = SA_AIS_OK;
+ break;
+ } else if (state == TakeoverStateStr[static_cast<std::uint8_t>(
+ TakeoverState::NEW)]) {
+ TRACE("Waiting for response to takeover request");
+ // set result to OK, in case we do not get a reply
+ // within the allocated period. This will allow the lock to
+ // removed by this node. Note: do not break out of the loop here!
+ rc = SA_AIS_OK;
+ }
+ }
+ ++retries;
+ std::this_thread::sleep_for(kSleepInterval);
+ }
+
+ LOG_NO("Result: %d", rc);
+ return rc;
+}
+
+SaAisErrorT Consensus::WriteTakeoverResult(
+ const std::string& timestamp, const std::string& current_owner,
+ const std::string& proposed_owner, const std::string&
proposed_cluster_size,
+ const TakeoverState result) {
+ TRACE_ENTER();
+
+ const std::string takeover_request =
+ timestamp + " " + current_owner + " " + proposed_owner + " " +
+ proposed_cluster_size + " " +
+ TakeoverStateStr[static_cast<std::uint8_t>(TakeoverState::NEW)];
+
+ const std::string takeover_result =
+ timestamp + " " + current_owner + " " + proposed_owner + " " +
+ proposed_cluster_size + " " +
+ TakeoverStateStr[static_cast<std::uint8_t>(result)];
+
+ LOG_NO("TakeoverResult: %s", takeover_result.c_str());
+
+ SaAisErrorT rc;
+
+ // previous value must match
+ rc =
+ KeyValue::Set(kTakeoverRequestKeyname, takeover_result,
takeover_request);
+
+ return rc;
+}
+
+SaAisErrorT Consensus::ReadTakeoverRequest(std::vector<std::string>& tokens) {
+ TRACE_ENTER();
+
+ std::string request;
+ SaAisErrorT rc;
+
+ rc = KeyValue::Get(kTakeoverRequestKeyname, request);
+ if (rc != SA_AIS_OK) {
+ // it doesn't always exist, don't log an error
+ LOG_NO("Could not read takeover request (%d)", rc);
+ return SA_AIS_ERR_FAILED_OPERATION;
+ }
+
+ if (request.empty() == true) {
+ // on node shutdown, this could be empty
+ return SA_AIS_ERR_UNAVAILABLE;
+ }
+
+ Split(request, tokens);
+ if (tokens.size() != 5) {
+ LOG_ER("Invalid takeover request");
+ return SA_AIS_ERR_LIBRARY;
+ }
+
+ return SA_AIS_OK;
+}
+
+Consensus::TakeoverState Consensus::HandleTakeoverRequest(
+ const uint64_t cluster_size) {
+ TRACE_ENTER();
+
+ if (use_consensus_ == false) {
+ return TakeoverState::UNDEFINED;
+ }
+
+ SaAisErrorT rc;
+ uint32_t retries = 0;
+ std::vector<std::string> tokens;
+
+ // get request from KV store
+ rc = ReadTakeoverRequest(tokens);
+ while (rc == SA_AIS_ERR_FAILED_OPERATION && retries < kMaxRetry) {
+ ++retries;
+ std::this_thread::sleep_for(kSleepInterval);
+ rc = ReadTakeoverRequest(tokens);
+ }
+
+ if (rc != SA_AIS_OK) {
+ return TakeoverState::UNDEFINED;
+ }
+
+ // request is a space delimited string with 5 elements
+ osafassert(tokens.size() == 5);
+
+ // check the owner is this node
+ if (tokens[1] != base::Conf::NodeName()) {
+ LOG_ER("We do not own the lock. Ignoring takeover request");
+ return TakeoverState::UNDEFINED;
+ }
+
+ // expiration timestamp
+ const uint64_t expiration = strtoull(
+ tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)].c_str(),
0,
+ 10);
+
+ // size of the other network partition
+ const uint64_t proposed_cluster_size = strtoull(
+
tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_NETWORK_SIZE)]
+ .c_str(),
+ 0, 10);
+
+ LOG_NO("Other network size: %" PRIu64 ", our network size: %" PRIu64,
+ proposed_cluster_size, cluster_size);
+
+ TakeoverState result;
+ if (CurrentTime() <= expiration && proposed_cluster_size > cluster_size) {
+ result = TakeoverState::ACCEPTED;
+ } else {
+ result = TakeoverState::REJECTED;
+ }
+
+ rc = WriteTakeoverResult(
+ tokens[static_cast<std::uint8_t>(TakeoverElements::TIMESTAMP)],
+ tokens[static_cast<std::uint8_t>(TakeoverElements::CURRENT_OWNER)],
+ tokens[static_cast<std::uint8_t>(TakeoverElements::PROPOSED_OWNER)],
+ tokens[static_cast<std::uint8_t>(
+ TakeoverElements::PROPOSED_NETWORK_SIZE)],
+ result);
+ if (rc != SA_AIS_OK) {
+ LOG_ER("Unable to write takeover result (%d)", rc);
+ return TakeoverState::UNDEFINED;
+ }
+
+ return result;
+}
+
+// separate space delimited elements in a string
+void Consensus::Split(const std::string& str,
+ std::vector<std::string>& tokens) const {
+ std::stringstream stream(str);
+ std::string buffer;
+
+ while (stream >> buffer) {
+ tokens.push_back(buffer);
+ }
+}
+
+// seconds after epoch
+uint64_t Consensus::CurrentTime() const {
+ auto now = std::chrono::system_clock::now();
+
+ // seconds since epoch
+ auto timestamp =
+ std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
+ .count();
+
+ return timestamp;
+}
diff --git a/src/osaf/consensus/consensus.h b/src/osaf/consensus/consensus.h
index 9a8aa7e9b..abd17612c 100644
--- a/src/osaf/consensus/consensus.h
+++ b/src/osaf/consensus/consensus.h
@@ -17,14 +17,16 @@
#include <chrono>
#include <string>
-#include "saAis.h"
+#include <vector>
#include "base/macros.h"
#include "osaf/consensus/key_value.h"
+#include "saAis.h"
class Consensus {
public:
// Set active controller to this node
- SaAisErrorT PromoteThisNode();
+ SaAisErrorT PromoteThisNode(const bool graceful_takeover,
+ const uint64_t cluster_size);
// Clear current active controller by releasing lock
SaAisErrorT DemoteCurrentActive();
@@ -40,6 +42,9 @@ class Consensus {
// in the callback
void MonitorLock(ConsensusCallback callback, const uint32_t user_defined);
+ void MonitorTakeoverRequest(ConsensusCallback callback,
+ const uint32_t user_defined);
+
// Is consensus service enabled?
bool IsEnabled() const;
@@ -52,17 +57,59 @@ class Consensus {
Consensus();
virtual ~Consensus();
+ static const std::string kTakeoverRequestKeyname;
+
+ enum class TakeoverState : std::uint8_t {
+ UNDEFINED = 0,
+ NEW = 1,
+ ACCEPTED = 2,
+ REJECTED = 3,
+ };
+
+ enum class TakeoverElements : std::uint8_t {
+ TIMESTAMP = 0,
+ CURRENT_OWNER = 1,
+ PROPOSED_OWNER = 2,
+ PROPOSED_NETWORK_SIZE = 3,
+ STATE = 4
+ };
+
+ const std::string TakeoverStateStr[4] = {"UNDEFINED", "NEW", "ACCEPTED",
+ "REJECTED"};
+
+ TakeoverState HandleTakeoverRequest(const uint64_t cluster_size);
+
private:
bool use_consensus_ = false;
bool use_remote_fencing_ = false;
const std::string kTestKeyname = "opensaf_write_test";
const std::chrono::milliseconds kSleepInterval =
- std::chrono::milliseconds(100); // in ms
+ std::chrono::milliseconds(500); // in ms
static constexpr uint32_t kLockTimeout = 0; // lock is persistent by default
- static constexpr uint32_t kMaxRetry = 600;
+ static constexpr uint32_t kMaxTakeoverRetry = 20;
+ static constexpr uint32_t kMaxRetry = 60;
+ static constexpr uint32_t kTakeoverValidTime = 20; // in seconds
+
+ void CheckForExistingTakeoverRequest();
+
+ SaAisErrorT CreateTakeoverRequest(const std::string& current_owner,
+ const std::string& proposed_owner,
+ const uint64_t cluster_size);
+
+ SaAisErrorT ReadTakeoverRequest(std::vector<std::string>& tokens);
+
+ SaAisErrorT WriteTakeoverResult(const std::string& timestamp,
+ const std::string& current_owner,
+ const std::string& proposed_owner,
+ const std::string& proposed_cluster_size,
+ const TakeoverState result);
+
SaAisErrorT Demote(const std::string& node);
bool FenceNode(const std::string& node);
+ void Split(const std::string& str, std::vector<std::string>& tokens) const;
+ uint64_t CurrentTime() const;
+
DELETE_COPY_AND_MOVE_OPERATORS(Consensus);
};
diff --git a/src/osaf/consensus/key_value.cc b/src/osaf/consensus/key_value.cc
index dbf07520b..80950e7cb 100644
--- a/src/osaf/consensus/key_value.cc
+++ b/src/osaf/consensus/key_value.cc
@@ -45,9 +45,9 @@ int KeyValue::Execute(const std::string& command,
std::string& output) {
SaAisErrorT KeyValue::Get(const std::string& key, std::string& value) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
- const std::string command(kv_store_cmd + " get " + key);
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " get \"" + key + "\"");
int rc = KeyValue::Execute(command, value);
TRACE("Read '%s'", value.c_str());
@@ -61,9 +61,10 @@ SaAisErrorT KeyValue::Get(const std::string& key,
std::string& value) {
SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
- const std::string command(kv_store_cmd + " set " + key + " " + value);
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " set \"" + key + "\" \"" + value +
+ "\"");
std::string output;
int rc = KeyValue::Execute(command, output);
@@ -74,12 +75,47 @@ SaAisErrorT KeyValue::Set(const std::string& key, const
std::string& value) {
}
}
+SaAisErrorT KeyValue::Set(const std::string& key, const std::string& value,
+ const std::string& prev_value) {
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " set_if_prev \"" + key + "\" \"" +
+ value + "\" \"" + prev_value + "\"");
+ std::string output;
+ int rc = KeyValue::Execute(command, output);
+
+ if (rc == 0) {
+ return SA_AIS_OK;
+ } else {
+ return SA_AIS_ERR_FAILED_OPERATION;
+ }
+}
+
+SaAisErrorT KeyValue::Create(const std::string& key, const std::string& value)
{
+ TRACE_ENTER();
+
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " create \"" + key + "\" \"" +
+ value + "\"");
+ std::string output;
+ int rc = KeyValue::Execute(command, output);
+
+ if (rc == 0) {
+ return SA_AIS_OK;
+ } else if (rc == 1) {
+ return SA_AIS_ERR_EXIST;
+ } else {
+ return SA_AIS_ERR_FAILED_OPERATION;
+ }
+}
+
SaAisErrorT KeyValue::Erase(const std::string& key) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
- const std::string command(kv_store_cmd + " erase " + key);
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " erase \"" + key + "\"");
std::string output;
int rc = KeyValue::Execute(command, output);
@@ -91,13 +127,13 @@ SaAisErrorT KeyValue::Erase(const std::string& key) {
}
SaAisErrorT KeyValue::Lock(const std::string& owner,
- const unsigned int timeout) {
+ const unsigned int timeout) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
- const std::string command(kv_store_cmd + " lock " + owner + " " +
- std::to_string(timeout));
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " lock \"" + owner + "\" " +
+ std::to_string(timeout));
std::string output;
int rc = KeyValue::Execute(command, output);
@@ -105,8 +141,10 @@ SaAisErrorT KeyValue::Lock(const std::string& owner,
return SA_AIS_OK;
} else if (rc == 1) {
// already locked
+ LOG_NO("Locked failed: %s", output.c_str());
return SA_AIS_ERR_EXIST;
} else {
+ LOG_NO("Locked failed: %s", output.c_str());
return SA_AIS_ERR_TRY_AGAIN;
}
}
@@ -114,15 +152,16 @@ SaAisErrorT KeyValue::Lock(const std::string& owner,
SaAisErrorT KeyValue::Unlock(const std::string& owner) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
- const std::string command(kv_store_cmd + " unlock " + owner);
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " unlock \"" + owner + "\"");
std::string output;
int rc = Execute(command, output);
if (rc == 0) {
return SA_AIS_OK;
} else if (rc == 1) {
+ LOG_NO("Unlock failed: %s", output.c_str());
LOG_ER("Lock is owned by another node");
return SA_AIS_ERR_INVALID_PARAM;
} else {
@@ -133,8 +172,8 @@ SaAisErrorT KeyValue::Unlock(const std::string& owner) {
SaAisErrorT KeyValue::LockOwner(std::string& owner) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
const std::string command(kv_store_cmd + " lock_owner");
std::string output;
int rc = KeyValue::Execute(command, output);
@@ -145,23 +184,24 @@ SaAisErrorT KeyValue::LockOwner(std::string& owner) {
return SA_AIS_OK;
}
+ // put output in owner, for debugging purposes
+ owner = output;
return SA_AIS_ERR_FAILED_OPERATION;
}
namespace {
static constexpr std::chrono::milliseconds kSleepInterval =
- std::chrono::milliseconds(100); // in ms
+ std::chrono::milliseconds(100); // in ms
static constexpr uint32_t kMaxRetry = 100;
-void WatchKeyFunction(const std::string& key,
- const ConsensusCallback& callback,
- const uint32_t user_defined) {
+void WatchKeyFunction(const std::string& key, const ConsensusCallback&
callback,
+ const uint32_t user_defined) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
- const std::string command(kv_store_cmd + " watch " + key);
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string command(kv_store_cmd + " watch \"" + key + "\"");
std::string value;
uint32_t retries = 0;
int rc;
@@ -183,11 +223,11 @@ void WatchKeyFunction(const std::string& key,
}
void WatchLockFunction(const ConsensusCallback& callback,
- const uint32_t user_defined) {
+ const uint32_t user_defined) {
TRACE_ENTER();
- const std::string kv_store_cmd = base::GetEnv(
- "FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
+ const std::string kv_store_cmd =
+ base::GetEnv("FMS_KEYVALUE_STORE_PLUGIN_CMD", "");
const std::string command(kv_store_cmd + " watch_lock");
std::string value;
uint32_t retries = 0;
@@ -211,16 +251,15 @@ void WatchLockFunction(const ConsensusCallback& callback,
} // namespace
-void KeyValue::Watch(const std::string& key,
- const ConsensusCallback callback,
- const uint32_t user_defined) {
+void KeyValue::Watch(const std::string& key, const ConsensusCallback callback,
+ const uint32_t user_defined) {
std::thread t(WatchKeyFunction, key, callback, user_defined);
t.detach();
return;
}
void KeyValue::WatchLock(const ConsensusCallback callback,
- const uint32_t user_defined) {
+ const uint32_t user_defined) {
std::thread t(WatchLockFunction, callback, user_defined);
t.detach();
return;
diff --git a/src/osaf/consensus/key_value.h b/src/osaf/consensus/key_value.h
index 853f303ff..b003629c0 100644
--- a/src/osaf/consensus/key_value.h
+++ b/src/osaf/consensus/key_value.h
@@ -20,9 +20,9 @@
#include <thread>
#include "saAis.h"
-typedef std::function<void(const std::string& key,
- const std::string& new_value,
- const uint32_t user_defined)> ConsensusCallback;
+typedef std::function<void(const std::string& key, const std::string&
new_value,
+ const uint32_t user_defined)>
+ ConsensusCallback;
class KeyValue {
public:
@@ -32,13 +32,20 @@ class KeyValue {
// Set key to value
static SaAisErrorT Set(const std::string& key, const std::string& value);
+ // Set key to value only if prev value matches
+ static SaAisErrorT Set(const std::string& key, const std::string& value,
+ const std::string& prev_value);
+
+ // Create key, and set to value. Fails if key already exists.
+ static SaAisErrorT Create(const std::string& key, const std::string& value);
+
// Erase key
static SaAisErrorT Erase(const std::string& key);
// Obtain lock, default timeout is 0 seconds (indefinite). If lock
// is called when already locked, the timeout is extended
static SaAisErrorT Lock(const std::string& owner,
- const unsigned int timeout = 0);
+ const unsigned int timeout = 0);
// Release lock
static SaAisErrorT Unlock(const std::string& owner);
@@ -48,11 +55,11 @@ class KeyValue {
// starts a thread to watch key and call callback if values changes
static void Watch(const std::string& key, ConsensusCallback callback,
- const uint32_t user_defined);
+ const uint32_t user_defined);
// starts a thread to watch the lock and call callback if is modified
static void WatchLock(ConsensusCallback callback,
- const uint32_t user_defined);
+ const uint32_t user_defined);
// internal use
static int Execute(const std::string& command, std::string& output);
--
2.14.1
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel