acelyc111 commented on code in PR #1388:
URL:
https://github.com/apache/incubator-pegasus/pull/1388#discussion_r1136497157
##########
src/runtime/ranger/ranger_resource_policy_manager.h:
##########
@@ -95,5 +134,11 @@ class ranger_resource_policy_manager
FRIEND_TEST(ranger_resource_policy_manager_test,
parse_policies_from_json_for_test);
};
+
+// Try to get the database name of 'app_name'.
+// When using Ranger for ACL, the constraint table naming rule is
+// "{database_name}.{table_name}", use "." to split database name and table
name.
+// Return an empty string if 'app_name' is not a valid Ranger rule table name.
+std::string get_database_name_from_app_name(const std::string &app_name);
Review Comment:
Add a test to check it's functionality.
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -17,19 +17,58 @@
#include <ctype.h>
#include <algorithm>
+#include <chrono>
+#include <iosfwd>
+#include <memory>
#include <unordered_set>
#include <utility>
+// Disable class-memaccess warning to facilitate compilation with gcc>7
+// https://github.com/Tencent/rapidjson/issues/1700
+#pragma GCC diagnostic push
+#if defined(__GNUC__) && __GNUC__ >= 8
+#pragma GCC diagnostic ignored "-Wclass-memaccess"
+#endif
+#include <rapidjson/document.h>
+
+#pragma GCC diagnostic pop
+
+#include "common/replica_envs.h"
+#include "common/replication.codes.h"
+#include "common/replication_common.h"
+#include "dsn.layer2_types.h"
+#include "fmt/core.h"
#include "meta/meta_options.h"
#include "meta/meta_service.h"
+#include "meta/meta_state_service.h"
+#include "meta/server_state.h"
+#include "meta_admin_types.h"
#include "ranger_resource_policy_manager.h"
+#include "rapidjson/allocators.h"
#include "runtime/ranger/ranger_resource_policy.h"
+#include "runtime/task/async_calls.h"
+#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
+#include "utils/blob.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
+#include "utils/process_utils.h"
+#include "utils/smart_pointers.h"
+#include "utils/strings.h"
namespace dsn {
namespace ranger {
+DSN_DEFINE_uint32(security,
+ update_ranger_policy_interval_sec,
+ 5,
+ "The interval seconds of meta "
+ "server to pull the latest "
+ "access control policy from "
+ "Ranger service.");
+DSN_DEFINE_string(ranger, ranger_service_url, "", "Apache Ranger service
url.");
+DSN_DEFINE_string(ranger, ranger_service_name, "", "use policy name.");
Review Comment:
The description is not very clear, could you improve it please?
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
policies.emplace_back(pi);
}
}
+
+dsn::error_code
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+ std::string ranger_policies;
+
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+ "Pull Ranger policies failed.");
+ LOG_DEBUG("Pull Ranger policies success.");
+
+ auto err_code = load_policies_from_json(ranger_policies);
+ if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+ LOG_DEBUG("Skip to update local policies.");
+ // for the newly created table, its app envs must be empty. This needs
to be executed
+ // periodically to update the table's app envs, regardless of whether
the Ranger policy is
+ // updated or not.
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies
to app envs failed.");
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+ return dsn::ERR_OK;
+ }
+ ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+ start_to_dump_and_sync_policies();
+
+ return dsn::ERR_OK;
+}
+
+dsn::error_code
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+ std::string *ranger_policies) const
+{
+ std::string cmd =
+ fmt::format("curl {}/{}", FLAGS_ranger_service_url,
FLAGS_ranger_service_name);
+ std::stringstream resp;
+ if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+ return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+ }
+
+ *ranger_policies = resp.str();
+ return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const
std::string &data)
+{
+ // The Ranger policy pulled from Ranger service demo.
+ /*
+ {
+ "serviceName": "PEGASUS1",
+ "serviceId": 1069,
+ "policyVersion": 60,
+ "policyUpdateTime": 1673254471000,
+ "policies": [{
+ "id": 5334,
+ "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+ "isEnabled": true,
+ "version": 13,
+ "service": "PEGASUS1",
+ "name": "all - database",
+ "policyType": 0,
+ "policyPriority": 0,
+ "description": "Policy for all - database",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": ["PEGASUS1"],
+ "isExcludes": false,
+ "isRecursive": true
+ }
+ },
+ "policyItems": [{
+ "accesses": [{
+ "type": "create",
+ "isAllowed": true
+ }, {
+ "type": "drop",
+ "isAllowed": true
+ }, {
+ "type": "control",
+ "isAllowed": true
+ }, {
+ "type": "metadata",
+ "isAllowed": true
+ }, {
+ "type": "list",
+ "isAllowed": true
+ }],
+ "users": ["PEGASUS1"],
+ "groups": [],
+ "roles": [],
+ "conditions": [],
+ "delegateAdmin": true
+ }],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "serviceType": "pegasus",
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [],
+ "zoneName": "",
+ "isDenyAllElse": false
+ }],
+ "auditMode": "audit-default",
+ "serviceConfig": {}
+ }
+ */
+ rapidjson::Document doc;
+ doc.Parse(data.c_str());
+
+ // Check if it is needed to update policies.
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+ int remote_policy_version = doc["policyVersion"].GetInt();
+ if (_local_policy_version == remote_policy_version) {
+ LOG_DEBUG("Ranger policy version: {}, no need to update.",
_local_policy_version);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version > remote_policy_version) {
+ LOG_WARNING("Local Ranger policy version ({}) is larger than remote
version ({}), please "
+ "check Ranger services ({}).",
+ _local_policy_version,
+ remote_policy_version,
+ FLAGS_ranger_service_name);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version == 0) {
+ _local_policy_version = remote_policy_version;
+ }
+
+ // Update policies.
+ _all_resource_policies.clear();
+
+ // TODO(wanghao): it's optional
+ // Provide a DATABASE default policy for legacy tables.
+ // ranger_resource_policy default_database_policy;
+ //
ranger_resource_policy::create_default_database_policy(default_database_policy);
+ // _all_resource_policies[enum_to_string(resource_type::kDatabase)] =
{default_database_policy};
+
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+ const rapidjson::Value &policies = doc["policies"];
+ RETURN_ERR_IF_NOT_ARRAY(policies);
+ for (const auto &policy : policies.GetArray()) {
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+ // 1. Check if the policy is enabled or not.
+ if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+ continue;
+ }
+
+ // 2. Parse resource type.
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+ std::map<std::string, std::unordered_set<std::string>>
values_of_resource_type;
+ for (const auto &resource : policy["resources"].GetObject()) {
+ RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+ RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+ std::unordered_set<std::string> values;
+ for (const auto &v : (resource.value)["values"].GetArray()) {
+ values.insert(v.GetString());
+ }
+
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(),
values));
+ }
+
+ // 3. Construct ACL policy.
+ ranger_resource_policy resource_policy;
+ CONTINUE_IF_MISSING_MEMBER(policy, "name");
+ resource_policy.name = policy["name"].GetString();
+
+ resource_type rt = resource_type::kUnknown;
+ do {
+ // TODO(wanghao): refactor the following code
+ // parse Ranger policies json string into
`values_of_resource_type`, distinguish
+ // resource types by `values_of_resource_type.size()`
+ if (values_of_resource_type.size() == 1) {
+ auto iter = values_of_resource_type.find("global");
+ if (iter != values_of_resource_type.end()) {
+ rt = resource_type::kGlobal;
+ break;
+ }
+ iter = values_of_resource_type.find("database");
+ if (iter != values_of_resource_type.end()) {
+ resource_policy.database_names = iter->second;
+ rt = resource_type::kDatabase;
+ break;
+ }
+ } else if (values_of_resource_type.size() == 2) {
+ auto iter1 = values_of_resource_type.find("database");
+ auto iter2 = values_of_resource_type.find("table");
+ if (iter1 != values_of_resource_type.end() &&
+ iter2 != values_of_resource_type.end()) {
+ resource_policy.database_names = iter1->second;
+ resource_policy.table_names = iter2->second;
+ rt = resource_type::kDatabaseTable;
+ break;
+ }
+ }
+ return dsn::ERR_RANGER_PARSE_ACL;
+ } while (false);
+
+ parse_policies_from_json(policy["policyItems"],
resource_policy.policies.allow_policies);
+ parse_policies_from_json(policy["denyPolicyItems"],
resource_policy.policies.deny_policies);
+ parse_policies_from_json(policy["allowExceptions"],
+
resource_policy.policies.allow_policies_exclude);
+ parse_policies_from_json(policy["denyExceptions"],
+
resource_policy.policies.deny_policies_exclude);
+
+ // 4. Add the ACL policy.
+ auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+
resource_policies({resource_policy}));
+ if (!ret.second) {
+ ret.first->second.emplace_back(resource_policy);
+ }
+ }
+
+ return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+ dsn::task_ptr sync_task = dsn::tasking::create_task(
+ LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() {
dump_and_sync_policies(); });
+ _meta_svc->get_remote_storage()->create_node(
+ _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this,
sync_task](dsn::error_code err) {
+ if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+ LOG_DEBUG("Create Ranger policy meta root succeed.");
+ sync_task->enqueue();
+ return;
+ }
+ CHECK_EQ(err, dsn::ERR_TIMEOUT);
+ LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
+ dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+ &_tracker,
+ [this]() {
start_to_dump_and_sync_policies(); },
+ 0,
+ load_ranger_policy_retry_delay_ms);
+ });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+ dump_policies_to_remote_storage();
+ LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+ update_cached_policies();
+ LOG_DEBUG("Update using resources policies succeed.");
+
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to
app envs failed.");
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+}
+
+void ranger_resource_policy_manager::dump_policies_to_remote_storage()
+{
+ dsn::blob value =
json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
+ _meta_svc->get_remote_storage()->set_data(
+ _ranger_policy_meta_root, value, LPC_CM_GET_RANGER_POLICY,
[this](dsn::error_code e) {
+ if (e == dsn::ERR_OK) {
+ LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+ return;
+ }
+ CHECK_EQ_MSG(e, dsn::ERR_TIMEOUT, "Dump Ranger policies to remote
storage failed.");
+ LOG_ERROR("Dump Ranger policies to remote storage timeout, retry
later.");
+ dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+ &_tracker,
+ [this]() {
dump_policies_to_remote_storage(); },
+ 0,
+ load_ranger_policy_retry_delay_ms);
+ });
+}
+
+void ranger_resource_policy_manager::update_cached_policies()
+{
+ {
+ utils::auto_write_lock l(_global_policies_lock);
+
_global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
+ // TODO(wanghao): provide a query method
+ }
+ {
+ utils::auto_write_lock l(_database_policies_lock);
+ _database_policies_cache.swap(
+ _all_resource_policies[enum_to_string(resource_type::kDatabase)]);
+ // TODO(wanghao): provide a query method
+ }
+}
+
+dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
+{
+ const auto &table_policies =
+
_all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
+ if (table_policies == _all_resource_policies.end()) {
+ LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app
envs.");
+ return dsn::ERR_OK;
+ }
+
+ dsn::replication::configuration_list_apps_response list_resp;
+ dsn::replication::configuration_list_apps_request list_req;
+ list_req.status = dsn::app_status::AS_AVAILABLE;
+ _meta_svc->get_server_state()->list_apps(list_req, list_resp);
+ ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed.");
+ for (const auto &app : list_resp.infos) {
+ std::string database_name =
get_database_name_from_app_name(app.app_name);
+ std::string table_name;
+ if (database_name.empty()) {
+ database_name = "*";
+ table_name = app.app_name;
+ } else {
+ table_name = app.app_name.substr(database_name.size());
+ }
+
+ auto req =
dsn::make_unique<dsn::replication::configuration_update_app_env_request>();
+ req->__set_app_name(app.app_name);
+ req->__set_keys(
+
{dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES});
+ bool has_match_policy = false;
Review Comment:
rename to `is_policy_matched` ?
##########
src/runtime/ranger/ranger_resource_policy_manager.h:
##########
@@ -73,20 +76,56 @@ class ranger_resource_policy_manager
static void parse_policies_from_json(const rapidjson::Value &data,
std::vector<policy_item> &policies);
+ // Update policies from Ranger service.
+ dsn::error_code update_policies_from_ranger_service();
+
+ // Pull policies in JSON format from Ranger service.
+ dsn::error_code pull_policies_from_ranger_service(std::string
*ranger_policies) const;
Review Comment:
I guess most of the new added functions can be declared as private, right?
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
policies.emplace_back(pi);
}
}
+
+dsn::error_code
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+ std::string ranger_policies;
+
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+ "Pull Ranger policies failed.");
+ LOG_DEBUG("Pull Ranger policies success.");
+
+ auto err_code = load_policies_from_json(ranger_policies);
+ if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+ LOG_DEBUG("Skip to update local policies.");
+ // for the newly created table, its app envs must be empty. This needs
to be executed
+ // periodically to update the table's app envs, regardless of whether
the Ranger policy is
+ // updated or not.
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies
to app envs failed.");
Review Comment:
The server will crash id sync failed, is it too restrict to assert the
result must be OK?
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
policies.emplace_back(pi);
}
}
+
+dsn::error_code
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+ std::string ranger_policies;
+
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+ "Pull Ranger policies failed.");
+ LOG_DEBUG("Pull Ranger policies success.");
+
+ auto err_code = load_policies_from_json(ranger_policies);
+ if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+ LOG_DEBUG("Skip to update local policies.");
+ // for the newly created table, its app envs must be empty. This needs
to be executed
+ // periodically to update the table's app envs, regardless of whether
the Ranger policy is
+ // updated or not.
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies
to app envs failed.");
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+ return dsn::ERR_OK;
+ }
+ ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+ start_to_dump_and_sync_policies();
+
+ return dsn::ERR_OK;
+}
+
+dsn::error_code
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+ std::string *ranger_policies) const
+{
+ std::string cmd =
+ fmt::format("curl {}/{}", FLAGS_ranger_service_url,
FLAGS_ranger_service_name);
+ std::stringstream resp;
+ if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+ return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+ }
+
+ *ranger_policies = resp.str();
+ return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const
std::string &data)
+{
+ // The Ranger policy pulled from Ranger service demo.
+ /*
+ {
+ "serviceName": "PEGASUS1",
+ "serviceId": 1069,
+ "policyVersion": 60,
+ "policyUpdateTime": 1673254471000,
+ "policies": [{
+ "id": 5334,
+ "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+ "isEnabled": true,
+ "version": 13,
+ "service": "PEGASUS1",
+ "name": "all - database",
+ "policyType": 0,
+ "policyPriority": 0,
+ "description": "Policy for all - database",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": ["PEGASUS1"],
+ "isExcludes": false,
+ "isRecursive": true
+ }
+ },
+ "policyItems": [{
+ "accesses": [{
+ "type": "create",
+ "isAllowed": true
+ }, {
+ "type": "drop",
+ "isAllowed": true
+ }, {
+ "type": "control",
+ "isAllowed": true
+ }, {
+ "type": "metadata",
+ "isAllowed": true
+ }, {
+ "type": "list",
+ "isAllowed": true
+ }],
+ "users": ["PEGASUS1"],
+ "groups": [],
+ "roles": [],
+ "conditions": [],
+ "delegateAdmin": true
+ }],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "serviceType": "pegasus",
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [],
+ "zoneName": "",
+ "isDenyAllElse": false
+ }],
+ "auditMode": "audit-default",
+ "serviceConfig": {}
+ }
+ */
+ rapidjson::Document doc;
+ doc.Parse(data.c_str());
+
+ // Check if it is needed to update policies.
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+ int remote_policy_version = doc["policyVersion"].GetInt();
+ if (_local_policy_version == remote_policy_version) {
+ LOG_DEBUG("Ranger policy version: {}, no need to update.",
_local_policy_version);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version > remote_policy_version) {
+ LOG_WARNING("Local Ranger policy version ({}) is larger than remote
version ({}), please "
+ "check Ranger services ({}).",
+ _local_policy_version,
+ remote_policy_version,
+ FLAGS_ranger_service_name);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version == 0) {
+ _local_policy_version = remote_policy_version;
+ }
+
+ // Update policies.
+ _all_resource_policies.clear();
+
+ // TODO(wanghao): it's optional
+ // Provide a DATABASE default policy for legacy tables.
+ // ranger_resource_policy default_database_policy;
+ //
ranger_resource_policy::create_default_database_policy(default_database_policy);
+ // _all_resource_policies[enum_to_string(resource_type::kDatabase)] =
{default_database_policy};
+
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+ const rapidjson::Value &policies = doc["policies"];
+ RETURN_ERR_IF_NOT_ARRAY(policies);
+ for (const auto &policy : policies.GetArray()) {
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+ // 1. Check if the policy is enabled or not.
+ if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+ continue;
+ }
+
+ // 2. Parse resource type.
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+ std::map<std::string, std::unordered_set<std::string>>
values_of_resource_type;
+ for (const auto &resource : policy["resources"].GetObject()) {
+ RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+ RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+ std::unordered_set<std::string> values;
+ for (const auto &v : (resource.value)["values"].GetArray()) {
+ values.insert(v.GetString());
+ }
+
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(),
values));
+ }
+
+ // 3. Construct ACL policy.
+ ranger_resource_policy resource_policy;
+ CONTINUE_IF_MISSING_MEMBER(policy, "name");
+ resource_policy.name = policy["name"].GetString();
+
+ resource_type rt = resource_type::kUnknown;
+ do {
+ // TODO(wanghao): refactor the following code
+ // parse Ranger policies json string into
`values_of_resource_type`, distinguish
+ // resource types by `values_of_resource_type.size()`
+ if (values_of_resource_type.size() == 1) {
+ auto iter = values_of_resource_type.find("global");
+ if (iter != values_of_resource_type.end()) {
+ rt = resource_type::kGlobal;
+ break;
+ }
+ iter = values_of_resource_type.find("database");
+ if (iter != values_of_resource_type.end()) {
+ resource_policy.database_names = iter->second;
+ rt = resource_type::kDatabase;
+ break;
+ }
+ } else if (values_of_resource_type.size() == 2) {
+ auto iter1 = values_of_resource_type.find("database");
+ auto iter2 = values_of_resource_type.find("table");
+ if (iter1 != values_of_resource_type.end() &&
+ iter2 != values_of_resource_type.end()) {
+ resource_policy.database_names = iter1->second;
+ resource_policy.table_names = iter2->second;
+ rt = resource_type::kDatabaseTable;
+ break;
+ }
+ }
+ return dsn::ERR_RANGER_PARSE_ACL;
+ } while (false);
+
+ parse_policies_from_json(policy["policyItems"],
resource_policy.policies.allow_policies);
+ parse_policies_from_json(policy["denyPolicyItems"],
resource_policy.policies.deny_policies);
+ parse_policies_from_json(policy["allowExceptions"],
+
resource_policy.policies.allow_policies_exclude);
+ parse_policies_from_json(policy["denyExceptions"],
+
resource_policy.policies.deny_policies_exclude);
+
+ // 4. Add the ACL policy.
+ auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+
resource_policies({resource_policy}));
+ if (!ret.second) {
+ ret.first->second.emplace_back(resource_policy);
+ }
+ }
+
+ return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+ dsn::task_ptr sync_task = dsn::tasking::create_task(
+ LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() {
dump_and_sync_policies(); });
+ _meta_svc->get_remote_storage()->create_node(
+ _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this,
sync_task](dsn::error_code err) {
+ if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+ LOG_DEBUG("Create Ranger policy meta root succeed.");
+ sync_task->enqueue();
+ return;
+ }
+ CHECK_EQ(err, dsn::ERR_TIMEOUT);
+ LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
Review Comment:
```suggestion
LOG_ERROR("Create Ranger policy meta root timeout, retry
later.");
```
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -83,9 +122,11 @@ const std::map<std::string, access_type>
kAccessTypeMaping({{"READ", access_type
{"CONTROL",
access_type::kControl}});
} // anonymous namespace
+std::chrono::milliseconds load_ranger_policy_retry_delay_ms(10000);
Review Comment:
How about declare it as a const variable? And rename as
`kLoadRangerPolicyRetryDelayMs`?
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
policies.emplace_back(pi);
}
}
+
+dsn::error_code
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+ std::string ranger_policies;
+
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+ "Pull Ranger policies failed.");
+ LOG_DEBUG("Pull Ranger policies success.");
+
+ auto err_code = load_policies_from_json(ranger_policies);
+ if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+ LOG_DEBUG("Skip to update local policies.");
+ // for the newly created table, its app envs must be empty. This needs
to be executed
+ // periodically to update the table's app envs, regardless of whether
the Ranger policy is
+ // updated or not.
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies
to app envs failed.");
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+ return dsn::ERR_OK;
+ }
+ ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+ start_to_dump_and_sync_policies();
+
+ return dsn::ERR_OK;
+}
+
+dsn::error_code
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+ std::string *ranger_policies) const
+{
+ std::string cmd =
+ fmt::format("curl {}/{}", FLAGS_ranger_service_url,
FLAGS_ranger_service_name);
+ std::stringstream resp;
+ if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+ return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+ }
+
+ *ranger_policies = resp.str();
+ return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const
std::string &data)
+{
+ // The Ranger policy pulled from Ranger service demo.
+ /*
+ {
+ "serviceName": "PEGASUS1",
+ "serviceId": 1069,
+ "policyVersion": 60,
+ "policyUpdateTime": 1673254471000,
+ "policies": [{
+ "id": 5334,
+ "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+ "isEnabled": true,
+ "version": 13,
+ "service": "PEGASUS1",
+ "name": "all - database",
+ "policyType": 0,
+ "policyPriority": 0,
+ "description": "Policy for all - database",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": ["PEGASUS1"],
+ "isExcludes": false,
+ "isRecursive": true
+ }
+ },
+ "policyItems": [{
+ "accesses": [{
+ "type": "create",
+ "isAllowed": true
+ }, {
+ "type": "drop",
+ "isAllowed": true
+ }, {
+ "type": "control",
+ "isAllowed": true
+ }, {
+ "type": "metadata",
+ "isAllowed": true
+ }, {
+ "type": "list",
+ "isAllowed": true
+ }],
+ "users": ["PEGASUS1"],
+ "groups": [],
+ "roles": [],
+ "conditions": [],
+ "delegateAdmin": true
+ }],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "serviceType": "pegasus",
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [],
+ "zoneName": "",
+ "isDenyAllElse": false
+ }],
+ "auditMode": "audit-default",
+ "serviceConfig": {}
+ }
+ */
+ rapidjson::Document doc;
+ doc.Parse(data.c_str());
+
+ // Check if it is needed to update policies.
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+ int remote_policy_version = doc["policyVersion"].GetInt();
+ if (_local_policy_version == remote_policy_version) {
+ LOG_DEBUG("Ranger policy version: {}, no need to update.",
_local_policy_version);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version > remote_policy_version) {
+ LOG_WARNING("Local Ranger policy version ({}) is larger than remote
version ({}), please "
+ "check Ranger services ({}).",
+ _local_policy_version,
+ remote_policy_version,
+ FLAGS_ranger_service_name);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version == 0) {
+ _local_policy_version = remote_policy_version;
+ }
+
+ // Update policies.
+ _all_resource_policies.clear();
+
+ // TODO(wanghao): it's optional
+ // Provide a DATABASE default policy for legacy tables.
+ // ranger_resource_policy default_database_policy;
+ //
ranger_resource_policy::create_default_database_policy(default_database_policy);
+ // _all_resource_policies[enum_to_string(resource_type::kDatabase)] =
{default_database_policy};
+
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+ const rapidjson::Value &policies = doc["policies"];
+ RETURN_ERR_IF_NOT_ARRAY(policies);
+ for (const auto &policy : policies.GetArray()) {
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+ // 1. Check if the policy is enabled or not.
+ if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+ continue;
+ }
+
+ // 2. Parse resource type.
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+ std::map<std::string, std::unordered_set<std::string>>
values_of_resource_type;
+ for (const auto &resource : policy["resources"].GetObject()) {
+ RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+ RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+ std::unordered_set<std::string> values;
+ for (const auto &v : (resource.value)["values"].GetArray()) {
+ values.insert(v.GetString());
+ }
+
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(),
values));
+ }
+
+ // 3. Construct ACL policy.
+ ranger_resource_policy resource_policy;
+ CONTINUE_IF_MISSING_MEMBER(policy, "name");
+ resource_policy.name = policy["name"].GetString();
+
+ resource_type rt = resource_type::kUnknown;
+ do {
+ // TODO(wanghao): refactor the following code
+ // parse Ranger policies json string into
`values_of_resource_type`, distinguish
+ // resource types by `values_of_resource_type.size()`
+ if (values_of_resource_type.size() == 1) {
+ auto iter = values_of_resource_type.find("global");
+ if (iter != values_of_resource_type.end()) {
+ rt = resource_type::kGlobal;
+ break;
+ }
+ iter = values_of_resource_type.find("database");
+ if (iter != values_of_resource_type.end()) {
+ resource_policy.database_names = iter->second;
+ rt = resource_type::kDatabase;
+ break;
+ }
+ } else if (values_of_resource_type.size() == 2) {
+ auto iter1 = values_of_resource_type.find("database");
+ auto iter2 = values_of_resource_type.find("table");
+ if (iter1 != values_of_resource_type.end() &&
+ iter2 != values_of_resource_type.end()) {
+ resource_policy.database_names = iter1->second;
+ resource_policy.table_names = iter2->second;
+ rt = resource_type::kDatabaseTable;
+ break;
+ }
+ }
+ return dsn::ERR_RANGER_PARSE_ACL;
+ } while (false);
+
+ parse_policies_from_json(policy["policyItems"],
resource_policy.policies.allow_policies);
+ parse_policies_from_json(policy["denyPolicyItems"],
resource_policy.policies.deny_policies);
+ parse_policies_from_json(policy["allowExceptions"],
+
resource_policy.policies.allow_policies_exclude);
+ parse_policies_from_json(policy["denyExceptions"],
+
resource_policy.policies.deny_policies_exclude);
+
+ // 4. Add the ACL policy.
+ auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+
resource_policies({resource_policy}));
+ if (!ret.second) {
+ ret.first->second.emplace_back(resource_policy);
+ }
+ }
+
+ return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+ dsn::task_ptr sync_task = dsn::tasking::create_task(
+ LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() {
dump_and_sync_policies(); });
+ _meta_svc->get_remote_storage()->create_node(
+ _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this,
sync_task](dsn::error_code err) {
Review Comment:
I'm a little curious why use 'GET' in `LPC_CM_GET_RANGER_POLICY`? In fact
it's a 'put' operation?
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
policies.emplace_back(pi);
}
}
+
+dsn::error_code
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+ std::string ranger_policies;
+
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+ "Pull Ranger policies failed.");
+ LOG_DEBUG("Pull Ranger policies success.");
+
+ auto err_code = load_policies_from_json(ranger_policies);
+ if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+ LOG_DEBUG("Skip to update local policies.");
+ // for the newly created table, its app envs must be empty. This needs
to be executed
+ // periodically to update the table's app envs, regardless of whether
the Ranger policy is
+ // updated or not.
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies
to app envs failed.");
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+ return dsn::ERR_OK;
+ }
+ ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+ start_to_dump_and_sync_policies();
+
+ return dsn::ERR_OK;
+}
+
+dsn::error_code
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+ std::string *ranger_policies) const
+{
+ std::string cmd =
+ fmt::format("curl {}/{}", FLAGS_ranger_service_url,
FLAGS_ranger_service_name);
+ std::stringstream resp;
+ if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+ return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+ }
+
+ *ranger_policies = resp.str();
+ return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const
std::string &data)
+{
+ // The Ranger policy pulled from Ranger service demo.
+ /*
+ {
+ "serviceName": "PEGASUS1",
+ "serviceId": 1069,
+ "policyVersion": 60,
+ "policyUpdateTime": 1673254471000,
+ "policies": [{
+ "id": 5334,
+ "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+ "isEnabled": true,
+ "version": 13,
+ "service": "PEGASUS1",
+ "name": "all - database",
+ "policyType": 0,
+ "policyPriority": 0,
+ "description": "Policy for all - database",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": ["PEGASUS1"],
+ "isExcludes": false,
+ "isRecursive": true
+ }
+ },
+ "policyItems": [{
+ "accesses": [{
+ "type": "create",
+ "isAllowed": true
+ }, {
+ "type": "drop",
+ "isAllowed": true
+ }, {
+ "type": "control",
+ "isAllowed": true
+ }, {
+ "type": "metadata",
+ "isAllowed": true
+ }, {
+ "type": "list",
+ "isAllowed": true
+ }],
+ "users": ["PEGASUS1"],
+ "groups": [],
+ "roles": [],
+ "conditions": [],
+ "delegateAdmin": true
+ }],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "serviceType": "pegasus",
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [],
+ "zoneName": "",
+ "isDenyAllElse": false
+ }],
+ "auditMode": "audit-default",
+ "serviceConfig": {}
+ }
+ */
+ rapidjson::Document doc;
+ doc.Parse(data.c_str());
+
+ // Check if it is needed to update policies.
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+ int remote_policy_version = doc["policyVersion"].GetInt();
+ if (_local_policy_version == remote_policy_version) {
+ LOG_DEBUG("Ranger policy version: {}, no need to update.",
_local_policy_version);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version > remote_policy_version) {
+ LOG_WARNING("Local Ranger policy version ({}) is larger than remote
version ({}), please "
+ "check Ranger services ({}).",
+ _local_policy_version,
+ remote_policy_version,
+ FLAGS_ranger_service_name);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version == 0) {
+ _local_policy_version = remote_policy_version;
+ }
+
+ // Update policies.
+ _all_resource_policies.clear();
+
+ // TODO(wanghao): it's optional
+ // Provide a DATABASE default policy for legacy tables.
+ // ranger_resource_policy default_database_policy;
+ //
ranger_resource_policy::create_default_database_policy(default_database_policy);
+ // _all_resource_policies[enum_to_string(resource_type::kDatabase)] =
{default_database_policy};
+
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+ const rapidjson::Value &policies = doc["policies"];
+ RETURN_ERR_IF_NOT_ARRAY(policies);
+ for (const auto &policy : policies.GetArray()) {
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+ // 1. Check if the policy is enabled or not.
+ if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+ continue;
+ }
+
+ // 2. Parse resource type.
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+ std::map<std::string, std::unordered_set<std::string>>
values_of_resource_type;
+ for (const auto &resource : policy["resources"].GetObject()) {
+ RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+ RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+ std::unordered_set<std::string> values;
+ for (const auto &v : (resource.value)["values"].GetArray()) {
+ values.insert(v.GetString());
+ }
+
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(),
values));
+ }
+
+ // 3. Construct ACL policy.
+ ranger_resource_policy resource_policy;
+ CONTINUE_IF_MISSING_MEMBER(policy, "name");
+ resource_policy.name = policy["name"].GetString();
+
+ resource_type rt = resource_type::kUnknown;
+ do {
+ // TODO(wanghao): refactor the following code
+ // parse Ranger policies json string into
`values_of_resource_type`, distinguish
+ // resource types by `values_of_resource_type.size()`
+ if (values_of_resource_type.size() == 1) {
+ auto iter = values_of_resource_type.find("global");
+ if (iter != values_of_resource_type.end()) {
+ rt = resource_type::kGlobal;
+ break;
+ }
+ iter = values_of_resource_type.find("database");
+ if (iter != values_of_resource_type.end()) {
+ resource_policy.database_names = iter->second;
+ rt = resource_type::kDatabase;
+ break;
+ }
+ } else if (values_of_resource_type.size() == 2) {
+ auto iter1 = values_of_resource_type.find("database");
+ auto iter2 = values_of_resource_type.find("table");
+ if (iter1 != values_of_resource_type.end() &&
+ iter2 != values_of_resource_type.end()) {
+ resource_policy.database_names = iter1->second;
+ resource_policy.table_names = iter2->second;
+ rt = resource_type::kDatabaseTable;
+ break;
+ }
+ }
+ return dsn::ERR_RANGER_PARSE_ACL;
+ } while (false);
+
+ parse_policies_from_json(policy["policyItems"],
resource_policy.policies.allow_policies);
+ parse_policies_from_json(policy["denyPolicyItems"],
resource_policy.policies.deny_policies);
+ parse_policies_from_json(policy["allowExceptions"],
+
resource_policy.policies.allow_policies_exclude);
+ parse_policies_from_json(policy["denyExceptions"],
+
resource_policy.policies.deny_policies_exclude);
+
+ // 4. Add the ACL policy.
+ auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+
resource_policies({resource_policy}));
+ if (!ret.second) {
+ ret.first->second.emplace_back(resource_policy);
+ }
+ }
+
+ return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+ dsn::task_ptr sync_task = dsn::tasking::create_task(
+ LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() {
dump_and_sync_policies(); });
+ _meta_svc->get_remote_storage()->create_node(
+ _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this,
sync_task](dsn::error_code err) {
+ if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+ LOG_DEBUG("Create Ranger policy meta root succeed.");
+ sync_task->enqueue();
+ return;
+ }
+ CHECK_EQ(err, dsn::ERR_TIMEOUT);
+ LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
+ dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+ &_tracker,
+ [this]() {
start_to_dump_and_sync_policies(); },
+ 0,
+ load_ranger_policy_retry_delay_ms);
+ });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+ dump_policies_to_remote_storage();
+ LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+ update_cached_policies();
+ LOG_DEBUG("Update using resources policies succeed.");
+
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to
app envs failed.");
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+}
+
+void ranger_resource_policy_manager::dump_policies_to_remote_storage()
+{
+ dsn::blob value =
json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
+ _meta_svc->get_remote_storage()->set_data(
+ _ranger_policy_meta_root, value, LPC_CM_GET_RANGER_POLICY,
[this](dsn::error_code e) {
+ if (e == dsn::ERR_OK) {
+ LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+ return;
+ }
+ CHECK_EQ_MSG(e, dsn::ERR_TIMEOUT, "Dump Ranger policies to remote
storage failed.");
+ LOG_ERROR("Dump Ranger policies to remote storage timeout, retry
later.");
+ dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+ &_tracker,
+ [this]() {
dump_policies_to_remote_storage(); },
+ 0,
+ load_ranger_policy_retry_delay_ms);
+ });
+}
+
+void ranger_resource_policy_manager::update_cached_policies()
+{
+ {
+ utils::auto_write_lock l(_global_policies_lock);
+
_global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
+ // TODO(wanghao): provide a query method
+ }
+ {
+ utils::auto_write_lock l(_database_policies_lock);
+ _database_policies_cache.swap(
+ _all_resource_policies[enum_to_string(resource_type::kDatabase)]);
+ // TODO(wanghao): provide a query method
+ }
+}
+
+dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
+{
+ const auto &table_policies =
+
_all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
+ if (table_policies == _all_resource_policies.end()) {
+ LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app
envs.");
+ return dsn::ERR_OK;
+ }
+
+ dsn::replication::configuration_list_apps_response list_resp;
+ dsn::replication::configuration_list_apps_request list_req;
+ list_req.status = dsn::app_status::AS_AVAILABLE;
+ _meta_svc->get_server_state()->list_apps(list_req, list_resp);
+ ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed.");
+ for (const auto &app : list_resp.infos) {
+ std::string database_name =
get_database_name_from_app_name(app.app_name);
+ std::string table_name;
+ if (database_name.empty()) {
+ database_name = "*";
+ table_name = app.app_name;
+ } else {
+ table_name = app.app_name.substr(database_name.size());
+ }
Review Comment:
Add a new function like `get_table_name_from_app_name`?
##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
policies.emplace_back(pi);
}
}
+
+dsn::error_code
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+ std::string ranger_policies;
+
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+ "Pull Ranger policies failed.");
+ LOG_DEBUG("Pull Ranger policies success.");
+
+ auto err_code = load_policies_from_json(ranger_policies);
+ if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+ LOG_DEBUG("Skip to update local policies.");
+ // for the newly created table, its app envs must be empty. This needs
to be executed
+ // periodically to update the table's app envs, regardless of whether
the Ranger policy is
+ // updated or not.
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies
to app envs failed.");
+ LOG_DEBUG("Sync policies to app envs succeeded.");
+ return dsn::ERR_OK;
+ }
+ ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+ start_to_dump_and_sync_policies();
+
+ return dsn::ERR_OK;
+}
+
+dsn::error_code
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+ std::string *ranger_policies) const
+{
+ std::string cmd =
+ fmt::format("curl {}/{}", FLAGS_ranger_service_url,
FLAGS_ranger_service_name);
+ std::stringstream resp;
+ if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+ return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+ }
+
+ *ranger_policies = resp.str();
+ return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const
std::string &data)
+{
+ // The Ranger policy pulled from Ranger service demo.
+ /*
+ {
+ "serviceName": "PEGASUS1",
+ "serviceId": 1069,
+ "policyVersion": 60,
+ "policyUpdateTime": 1673254471000,
+ "policies": [{
+ "id": 5334,
+ "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+ "isEnabled": true,
+ "version": 13,
+ "service": "PEGASUS1",
+ "name": "all - database",
+ "policyType": 0,
+ "policyPriority": 0,
+ "description": "Policy for all - database",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": ["PEGASUS1"],
+ "isExcludes": false,
+ "isRecursive": true
+ }
+ },
+ "policyItems": [{
+ "accesses": [{
+ "type": "create",
+ "isAllowed": true
+ }, {
+ "type": "drop",
+ "isAllowed": true
+ }, {
+ "type": "control",
+ "isAllowed": true
+ }, {
+ "type": "metadata",
+ "isAllowed": true
+ }, {
+ "type": "list",
+ "isAllowed": true
+ }],
+ "users": ["PEGASUS1"],
+ "groups": [],
+ "roles": [],
+ "conditions": [],
+ "delegateAdmin": true
+ }],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "serviceType": "pegasus",
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [],
+ "zoneName": "",
+ "isDenyAllElse": false
+ }],
+ "auditMode": "audit-default",
+ "serviceConfig": {}
+ }
+ */
+ rapidjson::Document doc;
+ doc.Parse(data.c_str());
+
+ // Check if it is needed to update policies.
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+ int remote_policy_version = doc["policyVersion"].GetInt();
+ if (_local_policy_version == remote_policy_version) {
+ LOG_DEBUG("Ranger policy version: {}, no need to update.",
_local_policy_version);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version > remote_policy_version) {
+ LOG_WARNING("Local Ranger policy version ({}) is larger than remote
version ({}), please "
+ "check Ranger services ({}).",
+ _local_policy_version,
+ remote_policy_version,
+ FLAGS_ranger_service_name);
+ return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+ }
+
+ if (_local_policy_version == 0) {
+ _local_policy_version = remote_policy_version;
+ }
+
+ // Update policies.
+ _all_resource_policies.clear();
+
+ // TODO(wanghao): it's optional
+ // Provide a DATABASE default policy for legacy tables.
+ // ranger_resource_policy default_database_policy;
+ //
ranger_resource_policy::create_default_database_policy(default_database_policy);
+ // _all_resource_policies[enum_to_string(resource_type::kDatabase)] =
{default_database_policy};
+
+ RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+ const rapidjson::Value &policies = doc["policies"];
+ RETURN_ERR_IF_NOT_ARRAY(policies);
+ for (const auto &policy : policies.GetArray()) {
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+ // 1. Check if the policy is enabled or not.
+ if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+ continue;
+ }
+
+ // 2. Parse resource type.
+ RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+ std::map<std::string, std::unordered_set<std::string>>
values_of_resource_type;
+ for (const auto &resource : policy["resources"].GetObject()) {
+ RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+ RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+ std::unordered_set<std::string> values;
+ for (const auto &v : (resource.value)["values"].GetArray()) {
+ values.insert(v.GetString());
+ }
+
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(),
values));
+ }
+
+ // 3. Construct ACL policy.
+ ranger_resource_policy resource_policy;
+ CONTINUE_IF_MISSING_MEMBER(policy, "name");
+ resource_policy.name = policy["name"].GetString();
+
+ resource_type rt = resource_type::kUnknown;
+ do {
+ // TODO(wanghao): refactor the following code
+ // parse Ranger policies json string into
`values_of_resource_type`, distinguish
+ // resource types by `values_of_resource_type.size()`
+ if (values_of_resource_type.size() == 1) {
+ auto iter = values_of_resource_type.find("global");
+ if (iter != values_of_resource_type.end()) {
+ rt = resource_type::kGlobal;
+ break;
+ }
+ iter = values_of_resource_type.find("database");
+ if (iter != values_of_resource_type.end()) {
+ resource_policy.database_names = iter->second;
+ rt = resource_type::kDatabase;
+ break;
+ }
+ } else if (values_of_resource_type.size() == 2) {
+ auto iter1 = values_of_resource_type.find("database");
+ auto iter2 = values_of_resource_type.find("table");
+ if (iter1 != values_of_resource_type.end() &&
+ iter2 != values_of_resource_type.end()) {
+ resource_policy.database_names = iter1->second;
+ resource_policy.table_names = iter2->second;
+ rt = resource_type::kDatabaseTable;
+ break;
+ }
+ }
+ return dsn::ERR_RANGER_PARSE_ACL;
+ } while (false);
+
+ parse_policies_from_json(policy["policyItems"],
resource_policy.policies.allow_policies);
+ parse_policies_from_json(policy["denyPolicyItems"],
resource_policy.policies.deny_policies);
+ parse_policies_from_json(policy["allowExceptions"],
+
resource_policy.policies.allow_policies_exclude);
+ parse_policies_from_json(policy["denyExceptions"],
+
resource_policy.policies.deny_policies_exclude);
+
+ // 4. Add the ACL policy.
+ auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+
resource_policies({resource_policy}));
+ if (!ret.second) {
+ ret.first->second.emplace_back(resource_policy);
+ }
+ }
+
+ return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+ dsn::task_ptr sync_task = dsn::tasking::create_task(
+ LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() {
dump_and_sync_policies(); });
+ _meta_svc->get_remote_storage()->create_node(
+ _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this,
sync_task](dsn::error_code err) {
+ if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+ LOG_DEBUG("Create Ranger policy meta root succeed.");
+ sync_task->enqueue();
+ return;
+ }
+ CHECK_EQ(err, dsn::ERR_TIMEOUT);
+ LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
+ dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+ &_tracker,
+ [this]() {
start_to_dump_and_sync_policies(); },
+ 0,
+ load_ranger_policy_retry_delay_ms);
+ });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+ LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+ dump_policies_to_remote_storage();
+ LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+ update_cached_policies();
+ LOG_DEBUG("Update using resources policies succeed.");
+
+ CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to
app envs failed.");
Review Comment:
Same. The server will crash id sync failed, is it too restrict to assert the
result must be OK?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]