This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 76cd79896 feat(make_idempotent): introduce new table-level APIs to
meta servers to list/control idempotence for atomic writes (#2205)
76cd79896 is described below
commit 76cd7989690ec17e790283d8f3ba5b1e985e9e56
Author: Dan Wang <[email protected]>
AuthorDate: Tue Mar 18 17:56:36 2025 +0800
feat(make_idempotent): introduce new table-level APIs to meta servers to
list/control idempotence for atomic writes (#2205)
https://github.com/apache/incubator-pegasus/issues/2197
We support enabling the idempotence of atomic write operations at the
**meta**
level.
First, we introduce `atomic_idempotent` as **an attribute** of the
`app_info` structure,
which represents the basic properties of a table. This attribute is also
persisted to
remote storage and will be loaded from remote storage when the meta server
starts.
As for the **interface**, we implement the following on the meta server:
- Support specifying this attribute when creating a table.
- List API will return the entire `app_info` object with
`atomic_idempotent` for each
table.
- Support getting/setting this attribute for any table.
---
idl/dsn.layer2.thrift | 5 +
idl/meta_admin.thrift | 35 ++++
src/client/replication_ddl_client.cpp | 19 ++
src/client/replication_ddl_client.h | 6 +
src/common/json_helper.h | 91 ++++++++--
src/common/replication.codes.h | 2 +
src/meta/meta_rpc_types.h | 91 +++++-----
src/meta/meta_service.cpp | 32 ++++
src/meta/meta_service.h | 6 +
src/meta/server_state.cpp | 246 +++++++++++++++++++++++---
src/meta/server_state.h | 32 ++--
src/meta/test/meta_app_operation_test.cpp | 242 +++++++++++++++++--------
src/meta/test/state_sync_test.cpp | 30 +++-
src/ranger/ranger_resource_policy_manager.cpp | 4 +-
14 files changed, 669 insertions(+), 172 deletions(-)
diff --git a/idl/dsn.layer2.thrift b/idl/dsn.layer2.thrift
index 47d736cb8..5a016f9bb 100644
--- a/idl/dsn.layer2.thrift
+++ b/idl/dsn.layer2.thrift
@@ -113,4 +113,9 @@ struct app_info
// New fields for bulk load
// Whether this app is executing bulk load
14:optional bool is_bulk_loading = false;
+
+ // Whether all atomic writes to this table are made idempotent:
+ // - true: made idempotent.
+ // - false: kept non-idempotent as their respective client requests.
+ 15:optional bool atomic_idempotent = false;
}
diff --git a/idl/meta_admin.thrift b/idl/meta_admin.thrift
index b5621c30b..457a21784 100644
--- a/idl/meta_admin.thrift
+++ b/idl/meta_admin.thrift
@@ -139,6 +139,11 @@ struct create_app_options
4:string app_type;
5:bool is_stateful;
6:map<string, string> envs;
+
+ // Whether all atomic writes to this table are made idempotent:
+ // - true: made idempotent.
+ // - false: kept non-idempotent as their respective client requests.
+ 7:optional bool atomic_idempotent = false;
}
struct configuration_create_app_request
@@ -437,6 +442,36 @@ struct configuration_set_max_replica_count_response
3:string hint_message;
}
+// Get the idempotence (see app_info::atomic_idempotent) of given table for
atomic writes.
+struct configuration_get_atomic_idempotent_request
+{
+ 1:string app_name;
+}
+
+struct configuration_get_atomic_idempotent_response
+{
+ 1:dsn.error_code err;
+ 2:bool atomic_idempotent;
+ 3:string hint_message;
+}
+
+// Change the idempotence (see app_info::atomic_idempotent) of given table for
atomic writes.
+struct configuration_set_atomic_idempotent_request
+{
+ 1:string app_name;
+ 2:bool atomic_idempotent;
+}
+
+struct configuration_set_atomic_idempotent_response
+{
+ 1:dsn.error_code err;
+
+ // Previous atomic_idempotent before updated.
+ 2:bool old_atomic_idempotent;
+
+ 3:string hint_message;
+}
+
// ONLY FOR GO
// A client to MetaServer's administration API.
service admin_client
diff --git a/src/client/replication_ddl_client.cpp
b/src/client/replication_ddl_client.cpp
index a5928389a..1e12b9f11 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -1674,6 +1674,25 @@ replication_ddl_client::set_max_replica_count(const
std::string &app_name,
configuration_set_max_replica_count_rpc(std::move(req),
RPC_CM_SET_MAX_REPLICA_COUNT));
}
+error_with<configuration_get_atomic_idempotent_response>
+replication_ddl_client::get_atomic_idempotent(const std::string &app_name)
+{
+ auto req = std::make_unique<configuration_get_atomic_idempotent_request>();
+ req->__set_app_name(app_name);
+ return call_rpc_sync(
+ configuration_get_atomic_idempotent_rpc(std::move(req),
RPC_CM_GET_ATOMIC_IDEMPOTENT));
+}
+
+error_with<configuration_set_atomic_idempotent_response>
+replication_ddl_client::set_atomic_idempotent(const std::string &app_name,
bool atomic_idempotent)
+{
+ auto req = std::make_unique<configuration_set_atomic_idempotent_request>();
+ req->__set_app_name(app_name);
+ req->__set_atomic_idempotent(atomic_idempotent);
+ return call_rpc_sync(
+ configuration_set_atomic_idempotent_rpc(std::move(req),
RPC_CM_SET_ATOMIC_IDEMPOTENT));
+}
+
error_with<configuration_rename_app_response>
replication_ddl_client::rename_app(const std::string &old_app_name, const
std::string &new_app_name)
{
diff --git a/src/client/replication_ddl_client.h
b/src/client/replication_ddl_client.h
index 0ddd58144..e83530481 100644
--- a/src/client/replication_ddl_client.h
+++ b/src/client/replication_ddl_client.h
@@ -299,6 +299,12 @@ public:
error_with<configuration_set_max_replica_count_response>
set_max_replica_count(const std::string &app_name, int32_t
max_replica_count);
+ error_with<configuration_get_atomic_idempotent_response>
+ get_atomic_idempotent(const std::string &app_name);
+
+ error_with<configuration_set_atomic_idempotent_response>
+ set_atomic_idempotent(const std::string &app_name, bool atomic_idempotent);
+
void set_max_wait_app_ready_secs(uint32_t max_wait_secs) { _max_wait_secs
= max_wait_secs; }
void set_meta_servers_leader();
diff --git a/src/common/json_helper.h b/src/common/json_helper.h
index d56bcecba..ace421433 100644
--- a/src/common/json_helper.h
+++ b/src/common/json_helper.h
@@ -104,6 +104,26 @@
out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14)
\
JSON_ENCODE_ENTRIES13(out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13); \
JSON_ENCODE_ENTRY(out, prefix, T14)
+#define JSON_ENCODE_ENTRIES15(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15) \
+ JSON_ENCODE_ENTRIES14(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14); \
+ JSON_ENCODE_ENTRY(out, prefix, T15)
+#define JSON_ENCODE_ENTRIES16(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15, T16) \
+ JSON_ENCODE_ENTRIES15(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14, T15); \
+ JSON_ENCODE_ENTRY(out, prefix, T16)
+#define JSON_ENCODE_ENTRIES17(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15, T16, T17) \
+ JSON_ENCODE_ENTRIES16(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14, T15, T16); \
+ JSON_ENCODE_ENTRY(out, prefix, T17)
+#define JSON_ENCODE_ENTRIES18(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15, T16, T17, T18) \
+ JSON_ENCODE_ENTRIES17(
\
+ out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14, T15, T16, T17); \
+ JSON_ENCODE_ENTRY(out, prefix, T18)
#define JSON_DECODE_ENTRY(in, prefix, T)
\
do {
\
@@ -162,9 +182,47 @@
in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14)
\
JSON_DECODE_ENTRIES13(in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10,
T11, T12, T13); \
JSON_TRY_DECODE_ENTRY(in, prefix, T14)
-
-#define JSON_ENTRIES_GET_MACRO(
\
- ph1, ph2, ph3, ph4, ph5, ph6, ph7, ph8, ph9, ph10, ph11, ph12, ph13, ph14,
NAME, ...) \
+#define JSON_DECODE_ENTRIES15(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15) \
+ JSON_DECODE_ENTRIES14(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14); \
+ JSON_TRY_DECODE_ENTRY(in, prefix, T15)
+#define JSON_DECODE_ENTRIES16(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15, T16) \
+ JSON_DECODE_ENTRIES15(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14, T15); \
+ JSON_TRY_DECODE_ENTRY(in, prefix, T16)
+#define JSON_DECODE_ENTRIES17(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15, T16, T17) \
+ JSON_DECODE_ENTRIES16(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14, T15, T16); \
+ JSON_TRY_DECODE_ENTRY(in, prefix, T17)
+#define JSON_DECODE_ENTRIES18(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14,
T15, T16, T17, T18) \
+ JSON_DECODE_ENTRIES17(
\
+ in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13,
T14, T15, T16, T17); \
+ JSON_TRY_DECODE_ENTRY(in, prefix, T18)
+
+#define JSON_ENTRIES_GET_MACRO(ph1,
\
+ ph2,
\
+ ph3,
\
+ ph4,
\
+ ph5,
\
+ ph6,
\
+ ph7,
\
+ ph8,
\
+ ph9,
\
+ ph10,
\
+ ph11,
\
+ ph12,
\
+ ph13,
\
+ ph14,
\
+ ph15,
\
+ ph16,
\
+ ph17,
\
+ ph18,
\
+ NAME,
\
+ ...)
\
NAME
// workaround due to the way VC handles "..."
#define JSON_ENTRIES_GET_MACRO_(tuple) JSON_ENTRIES_GET_MACRO tuple
@@ -172,6 +230,10 @@
#define JSON_ENCODE_ENTRIES(out, prefix, ...)
\
out.StartObject();
\
JSON_ENTRIES_GET_MACRO_((__VA_ARGS__,
\
+ JSON_ENCODE_ENTRIES18,
\
+ JSON_ENCODE_ENTRIES17,
\
+ JSON_ENCODE_ENTRIES16,
\
+ JSON_ENCODE_ENTRIES15,
\
JSON_ENCODE_ENTRIES14,
\
JSON_ENCODE_ENTRIES13,
\
JSON_ENCODE_ENTRIES12,
\
@@ -194,6 +256,10 @@
int arguments_count = 0;
\
int parsed_count = 0;
\
JSON_ENTRIES_GET_MACRO_((__VA_ARGS__,
\
+ JSON_DECODE_ENTRIES18,
\
+ JSON_DECODE_ENTRIES17,
\
+ JSON_DECODE_ENTRIES16,
\
+ JSON_DECODE_ENTRIES15,
\
JSON_DECODE_ENTRIES14,
\
JSON_DECODE_ENTRIES13,
\
JSON_DECODE_ENTRIES12,
\
@@ -448,14 +514,14 @@ inline bool json_decode(const dsn::json::JsonObject &in,
dsn::host_port &hp)
return static_cast<bool>(hp);
}
-inline void json_encode(JsonWriter &out, const dsn::partition_configuration
&pc);
-inline bool json_decode(const JsonObject &in, dsn::partition_configuration
&pc);
-inline void json_encode(JsonWriter &out, const dsn::app_info &info);
-inline bool json_decode(const JsonObject &in, dsn::app_info &info);
-inline void json_encode(JsonWriter &out, const dsn::replication::file_meta
&f_meta);
-inline bool json_decode(const JsonObject &in, dsn::replication::file_meta
&f_meta);
-inline void json_encode(JsonWriter &out, const
dsn::replication::bulk_load_metadata &metadata);
-inline bool json_decode(const JsonObject &in,
dsn::replication::bulk_load_metadata &metadata);
+inline void json_encode(JsonWriter &, const dsn::partition_configuration &);
+inline bool json_decode(const JsonObject &, dsn::partition_configuration &);
+inline void json_encode(JsonWriter &, const dsn::app_info &);
+inline bool json_decode(const JsonObject &, dsn::app_info &);
+inline void json_encode(JsonWriter &, const dsn::replication::file_meta &);
+inline bool json_decode(const JsonObject &, dsn::replication::file_meta &);
+inline void json_encode(JsonWriter &, const
dsn::replication::bulk_load_metadata &);
+inline bool json_decode(const JsonObject &in,
dsn::replication::bulk_load_metadata &);
template <typename T>
inline void json_encode_iterable(JsonWriter &out, const T &t)
@@ -740,7 +806,8 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::app_info,
drop_second,
duplicating,
init_partition_count,
- is_bulk_loading)
+ is_bulk_loading,
+ atomic_idempotent)
NON_MEMBER_JSON_SERIALIZATION(dsn::replication::file_meta, name, size, md5)
diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h
index 42ae695f7..28d846eea 100644
--- a/src/common/replication.codes.h
+++ b/src/common/replication.codes.h
@@ -131,6 +131,8 @@ MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT,
TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_GET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_SET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
+MAKE_EVENT_CODE_RPC(RPC_CM_GET_ATOMIC_IDEMPOTENT, TASK_PRIORITY_COMMON)
+MAKE_EVENT_CODE_RPC(RPC_CM_SET_ATOMIC_IDEMPOTENT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_USE_RANGER_ACCESS_CONTROL, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL
diff --git a/src/meta/meta_rpc_types.h b/src/meta/meta_rpc_types.h
index 200b409e1..b6002ac83 100644
--- a/src/meta/meta_rpc_types.h
+++ b/src/meta/meta_rpc_types.h
@@ -26,49 +26,52 @@
#include "replica_admin_types.h"
#include "rpc/rpc_holder.h"
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
-typedef rpc_holder<configuration_update_app_env_request,
configuration_update_app_env_response>
- app_env_rpc;
-typedef rpc_holder<ddd_diagnose_request, ddd_diagnose_response>
ddd_diagnose_rpc;
-typedef rpc_holder<configuration_query_by_node_request,
configuration_query_by_node_response>
- configuration_query_by_node_rpc;
-typedef rpc_holder<query_cfg_request, query_cfg_response>
configuration_query_by_index_rpc;
-typedef rpc_holder<configuration_list_apps_request,
configuration_list_apps_response>
- configuration_list_apps_rpc;
-typedef rpc_holder<configuration_list_nodes_request,
configuration_list_nodes_response>
- configuration_list_nodes_rpc;
-typedef rpc_holder<configuration_cluster_info_request,
configuration_cluster_info_response>
- configuration_cluster_info_rpc;
-typedef rpc_holder<configuration_balancer_request,
configuration_balancer_response>
- configuration_balancer_rpc;
-typedef rpc_holder<configuration_meta_control_request,
configuration_meta_control_response>
- configuration_meta_control_rpc;
-typedef rpc_holder<configuration_recovery_request,
configuration_recovery_response>
- configuration_recovery_rpc;
-typedef rpc_holder<configuration_report_restore_status_request,
- configuration_report_restore_status_response>
- configuration_report_restore_status_rpc;
-typedef rpc_holder<configuration_query_restore_request,
configuration_query_restore_response>
- configuration_query_restore_rpc;
-typedef rpc_holder<configuration_query_backup_policy_request,
- configuration_query_backup_policy_response>
- query_backup_policy_rpc;
-typedef rpc_holder<configuration_modify_backup_policy_request,
- configuration_modify_backup_policy_response>
- configuration_modify_backup_policy_rpc;
-typedef rpc_holder<start_backup_app_request, start_backup_app_response>
start_backup_app_rpc;
-typedef rpc_holder<query_backup_status_request, query_backup_status_response>
- query_backup_status_rpc;
-typedef rpc_holder<configuration_get_max_replica_count_request,
- configuration_get_max_replica_count_response>
- configuration_get_max_replica_count_rpc;
-typedef rpc_holder<configuration_set_max_replica_count_request,
- configuration_set_max_replica_count_response>
- configuration_set_max_replica_count_rpc;
-typedef rpc_holder<configuration_rename_app_request,
configuration_rename_app_response>
- configuration_rename_app_rpc;
+using app_env_rpc =
+ rpc_holder<configuration_update_app_env_request,
configuration_update_app_env_response>;
+using ddd_diagnose_rpc = rpc_holder<ddd_diagnose_request,
ddd_diagnose_response>;
+using configuration_query_by_node_rpc =
+ rpc_holder<configuration_query_by_node_request,
configuration_query_by_node_response>;
+using configuration_query_by_index_rpc = rpc_holder<query_cfg_request,
query_cfg_response>;
+using configuration_list_apps_rpc =
+ rpc_holder<configuration_list_apps_request,
configuration_list_apps_response>;
+using configuration_list_nodes_rpc =
+ rpc_holder<configuration_list_nodes_request,
configuration_list_nodes_response>;
+using configuration_cluster_info_rpc =
+ rpc_holder<configuration_cluster_info_request,
configuration_cluster_info_response>;
+using configuration_balancer_rpc =
+ rpc_holder<configuration_balancer_request,
configuration_balancer_response>;
+using configuration_meta_control_rpc =
+ rpc_holder<configuration_meta_control_request,
configuration_meta_control_response>;
+using configuration_recovery_rpc =
+ rpc_holder<configuration_recovery_request,
configuration_recovery_response>;
+using configuration_report_restore_status_rpc =
+ rpc_holder<configuration_report_restore_status_request,
+ configuration_report_restore_status_response>;
+using configuration_query_restore_rpc =
+ rpc_holder<configuration_query_restore_request,
configuration_query_restore_response>;
+using query_backup_policy_rpc =
rpc_holder<configuration_query_backup_policy_request,
+
configuration_query_backup_policy_response>;
+using configuration_modify_backup_policy_rpc =
+ rpc_holder<configuration_modify_backup_policy_request,
+ configuration_modify_backup_policy_response>;
+using start_backup_app_rpc = rpc_holder<start_backup_app_request,
start_backup_app_response>;
+using query_backup_status_rpc =
+ rpc_holder<query_backup_status_request, query_backup_status_response>;
+using configuration_get_max_replica_count_rpc =
+ rpc_holder<configuration_get_max_replica_count_request,
+ configuration_get_max_replica_count_response>;
+using configuration_set_max_replica_count_rpc =
+ rpc_holder<configuration_set_max_replica_count_request,
+ configuration_set_max_replica_count_response>;
+using configuration_get_atomic_idempotent_rpc =
+ rpc_holder<configuration_get_atomic_idempotent_request,
+ configuration_get_atomic_idempotent_response>;
+using configuration_set_atomic_idempotent_rpc =
+ rpc_holder<configuration_set_atomic_idempotent_request,
+ configuration_set_atomic_idempotent_response>;
+using configuration_rename_app_rpc =
+ rpc_holder<configuration_rename_app_request,
configuration_rename_app_response>;
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 6ec19c007..4be52d90e 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -563,6 +563,12 @@ void meta_service::register_rpc_handlers()
register_rpc_handler_with_rpc_holder(RPC_CM_SET_MAX_REPLICA_COUNT,
"set_max_replica_count",
&meta_service::on_set_max_replica_count);
+ register_rpc_handler_with_rpc_holder(RPC_CM_GET_ATOMIC_IDEMPOTENT,
+ "get_atomic_idempotent",
+
&meta_service::on_get_atomic_idempotent);
+ register_rpc_handler_with_rpc_holder(RPC_CM_SET_ATOMIC_IDEMPOTENT,
+ "set_atomic_idempotent",
+
&meta_service::on_set_atomic_idempotent);
}
meta_leader_state meta_service::check_leader(dsn::message_ex *req,
dsn::host_port *forward_address)
@@ -1353,5 +1359,31 @@ void
meta_service::on_set_max_replica_count(configuration_set_max_replica_count_
server_state::sStateHash);
}
+// ThreadPool: THREAD_POOL_META_SERVER
+void
meta_service::on_get_atomic_idempotent(configuration_get_atomic_idempotent_rpc
rpc)
+{
+ if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) {
+ return;
+ }
+
+ tasking::enqueue(LPC_META_STATE_NORMAL,
+ tracker(),
+ std::bind(&server_state::get_atomic_idempotent,
_state.get(), rpc),
+ server_state::sStateHash);
+}
+
+// ThreadPool: THREAD_POOL_META_SERVER
+void
meta_service::on_set_atomic_idempotent(configuration_set_atomic_idempotent_rpc
rpc)
+{
+ if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) {
+ return;
+ }
+
+ tasking::enqueue(LPC_META_STATE_NORMAL,
+ tracker(),
+ std::bind(&server_state::set_atomic_idempotent,
_state.get(), rpc),
+ server_state::sStateHash);
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h
index 5230fbfd4..9b9488481 100644
--- a/src/meta/meta_service.h
+++ b/src/meta/meta_service.h
@@ -289,6 +289,12 @@ private:
void on_get_max_replica_count(configuration_get_max_replica_count_rpc rpc);
void on_set_max_replica_count(configuration_set_max_replica_count_rpc rpc);
+ // Get `atomic_idempotent` of a table.
+ void on_get_atomic_idempotent(configuration_get_atomic_idempotent_rpc rpc);
+
+ // Set `atomic_idempotent` of a table.
+ void on_set_atomic_idempotent(configuration_set_atomic_idempotent_rpc rpc);
+
// if return 'kNotLeaderAndCannotForwardRpc' and 'forward_address' !=
nullptr, then return
// leader by 'forward_address'.
meta_leader_state check_leader(dsn::message_ex *req, dsn::host_port
*forward_address);
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 72557a8c5..823e4fe28 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -51,6 +51,7 @@
#include "common/replication_common.h"
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
+#include "dsn.layer2_types.h"
#include "dump_file.h"
#include "meta/app_env_validator.h"
#include "meta/meta_data.h"
@@ -1213,6 +1214,10 @@ void server_state::create_app(dsn::message_ex *msg)
info.create_second = static_cast<int64_t>(dsn_now_s());
info.init_partition_count = request.options.partition_count;
+ // No need to check `request.options.__isset.atomic_idempotent`, since by
default
+ // it is true (because `request.options.atomic_idempotent` has default
value false).
+ info.__set_atomic_idempotent(request.options.atomic_idempotent);
+
app = app_state::create(info);
app->helpers->pending_response = msg;
app->helpers->partitions_in_progress.store(info.partition_count);
@@ -2358,20 +2363,50 @@ void server_state::on_propose_balancer(const
configuration_balancer_request &req
_meta_svc->get_balancer()->register_proposals({&_all_apps, &_nodes},
request, response);
}
+namespace {
+
+bool app_info_compatible_equal(const app_info &l, const app_info &r)
+{
+ // Some fields like `app_type`, `app_id` and `create_second` are
initialized and
+ // persisted into .app-info file when the replica is created, and will
NEVER be
+ // changed during their lifetime even if the table is dropped or recalled.
Their
+ // consistency must be checked.
+ //
+ // Some fields may be updated during their lifetime, but will NEVER be
persisted
+ // into .app-info, such as most environments in `envs`. Their consistency
do not
+ // need to be checked.
+ //
+ // Some fields may be updated during their lifetime, and will also be
persited into
+ // .app-info file:
+ // - For the fields such as `app_name`, `max_replica_count` and
`atomic_idempotent`
+ // without compatibility problems, their consistency should be checked.
+ // - For the fields such as `duplicating` whose compatibility varies
between primary
+ // and secondaries in 2.1.x, 2.2.x and 2.3.x release, their consistency
are not
+ // checked.
+ return l.status == r.status && l.app_type == r.app_type && l.app_name ==
r.app_name &&
+ l.app_id == r.app_id && l.partition_count == r.partition_count &&
+ l.is_stateful == r.is_stateful && l.max_replica_count ==
r.max_replica_count &&
+ l.expire_second == r.expire_second && l.create_second ==
r.create_second &&
+ l.drop_second == r.drop_second && l.atomic_idempotent ==
r.atomic_idempotent;
+}
+
+} // anonymous namespace
+
error_code
server_state::construct_apps(const std::vector<query_app_info_response>
&query_app_responses,
const std::vector<dsn::host_port> &replica_nodes,
std::string &hint_message)
{
int max_app_id = 0;
- for (unsigned int i = 0; i < query_app_responses.size(); ++i) {
- query_app_info_response query_resp = query_app_responses[i];
- if (query_resp.err != dsn::ERR_OK)
+ for (size_t i = 0; i < query_app_responses.size(); ++i) {
+ const auto &query_resp = query_app_responses[i];
+ if (query_resp.err != dsn::ERR_OK) {
continue;
+ }
for (const app_info &info : query_resp.apps) {
- CHECK_GE_MSG(info.app_id, 1, "invalid app id");
- auto iter = _all_apps.find(info.app_id);
+ CHECK_GT_MSG(info.app_id, 0, "invalid app id");
+ const auto iter = std::as_const(_all_apps).find(info.app_id);
if (iter == _all_apps.end()) {
std::shared_ptr<app_state> app = app_state::create(info);
LOG_INFO("create app info from ({}) for id({}): {}",
@@ -2380,23 +2415,20 @@ server_state::construct_apps(const
std::vector<query_app_info_response> &query_a
boost::lexical_cast<std::string>(info));
_all_apps.emplace(app->app_id, app);
max_app_id = std::max(app->app_id, max_app_id);
- } else {
- app_info *old_info = iter->second.get();
- // all info in all replica servers should be the same
- // coz the app info is only initialized when the replica is
- // created, and it will NEVER change even if the app is
dropped/recalled...
- if (info != *old_info) // app_info::operator !=
- {
- // compatible for app.duplicating different between
primary and secondaries in
- // 2.1.x, 2.2.x and 2.3.x release
- CHECK(app_info_compatible_equal(info, *old_info),
- "conflict app info from ({}) for id({}):
new_info({}), old_info({})",
- replica_nodes[i],
- info.app_id,
- boost::lexical_cast<std::string>(info),
- boost::lexical_cast<std::string>(*old_info));
- }
+ continue;
}
+
+ app_info *old_info = iter->second.get();
+ if (info == *old_info) {
+ continue;
+ }
+
+ CHECK(app_info_compatible_equal(info, *old_info),
+ "conflict app info from ({}) for id({}): new_info({}),
old_info({})",
+ replica_nodes[i],
+ info.app_id,
+ boost::lexical_cast<std::string>(info),
+ boost::lexical_cast<std::string>(*old_info));
}
}
@@ -3648,7 +3680,7 @@ std::shared_ptr<app_state>
server_state::get_app_and_check_exist(const std::stri
Response
&response) const
{
auto app = get_app(app_name);
- if (app == nullptr) {
+ if (!app) {
response.err = ERR_APP_NOT_EXIST;
response.hint_message = fmt::format("app({}) does not exist",
app_name);
}
@@ -3756,7 +3788,7 @@ void
server_state::set_max_replica_count(configuration_set_max_replica_count_rpc
response.old_max_replica_count = app->max_replica_count;
if (app->status != app_status::AS_AVAILABLE) {
- response.err = ERR_INVALID_PARAMETERS;
+ response.err = ERR_INVALID_STATE;
response.hint_message = fmt::format("app({}) is not in available
status", app_name);
LOG_ERROR("failed to set max_replica_count: app_name={},
app_id={}, error_code={}, "
"hint_message={}",
@@ -4458,6 +4490,174 @@ void
server_state::recover_app_max_replica_count(std::shared_ptr<app_state> &app
&tracker);
}
+// ThreadPool: THREAD_POOL_META_STATE
+void
server_state::get_atomic_idempotent(configuration_get_atomic_idempotent_rpc
rpc) const
+{
+ const auto &app_name = rpc.request().app_name;
+ auto &response = rpc.response();
+
+ zauto_read_lock l(_lock);
+
+ auto app = get_app_and_check_exist(app_name, response);
+ if (!app) {
+ response.atomic_idempotent = false;
+ LOG_WARNING("failed to get atomic_idempotent: app_name={}, "
+ "error_code={}, hint_message={}",
+ app_name,
+ response.err,
+ response.hint_message);
+ return;
+ }
+
+ response.err = ERR_OK;
+
+ // No need to check `app->__isset.atomic_idempotent`, since by default it
is true
+ // (because `app->atomic_idempotent` has default value false).
+ response.atomic_idempotent = app->atomic_idempotent;
+
+ LOG_INFO("get atomic_idempotent successfully: app_name={}, app_id={}, "
+ "atomic_idempotent={}",
+ app_name,
+ app->app_id,
+ response.atomic_idempotent);
+}
+
+// ThreadPool: THREAD_POOL_META_STATE
+void
server_state::set_atomic_idempotent(configuration_set_atomic_idempotent_rpc rpc)
+{
+ const auto &app_name = rpc.request().app_name;
+ const auto new_atomic_idempotent = rpc.request().atomic_idempotent;
+ auto &response = rpc.response();
+
+ int32_t app_id = 0;
+ std::shared_ptr<app_state> app;
+
+ {
+ zauto_read_lock l(_lock);
+
+ app = get_app_and_check_exist(app_name, response);
+ if (!app) {
+ response.old_atomic_idempotent = false;
+ LOG_WARNING("failed to set atomic_idempotent: app_name={}, "
+ "error_code={}, hint_message={}",
+ app_name,
+ response.err,
+ response.hint_message);
+ return;
+ }
+
+ app_id = app->app_id;
+
+ // No need to check `app->__isset.atomic_idempotent`, since by default
it is true
+ // (because `app->atomic_idempotent` has default value false).
+ response.old_atomic_idempotent = app->atomic_idempotent;
+
+ if (app->status != app_status::AS_AVAILABLE) {
+ response.err = ERR_INVALID_STATE;
+ response.hint_message = fmt::format("app({}) is not in available
status", app_name);
+ LOG_ERROR("failed to set atomic_idempotent: app_name={},
app_id={}, "
+ "error_code={}, hint_message={}",
+ app_name,
+ app_id,
+ response.err,
+ response.hint_message);
+ return;
+ }
+ }
+
+ const auto level = _meta_svc->get_function_level();
+ if (level <= meta_function_level::fl_freezed) {
+ response.err = ERR_STATE_FREEZED;
+ response.hint_message =
+ "current meta function level is freezed, since there are too few
alive nodes";
+ LOG_ERROR("failed to set atomic_idempotent: app_name={}, app_id={}, "
+ "error_code={}, message={}",
+ app_name,
+ app_id,
+ response.err,
+ response.hint_message);
+ return;
+ }
+
+ if (new_atomic_idempotent == response.old_atomic_idempotent) {
+ response.err = ERR_OK;
+ response.hint_message = "no need to update atomic_idempotent since
it's unchanged";
+ LOG_WARNING("{}: app_name={}, app_id={}", response.hint_message,
app_name, app_id);
+ return;
+ }
+
+ LOG_INFO("request for updating atomic_idempotent: app_name={}, app_id={}, "
+ "old_atomic_idempotent={}, new_atomic_idempotent={}",
+ app_name,
+ app_id,
+ response.old_atomic_idempotent,
+ new_atomic_idempotent);
+
+ update_app_atomic_idempotent_on_remote(app, rpc);
+}
+
+// ThreadPool: THREAD_POOL_META_STATE
+void server_state::update_app_atomic_idempotent_on_remote(
+ std::shared_ptr<app_state> &app, configuration_set_atomic_idempotent_rpc
rpc)
+{
+ app_info ainfo = *app;
+ ainfo.atomic_idempotent = rpc.request().atomic_idempotent;
+ do_update_app_info(get_app_path(*app), ainfo, [this, app, rpc](error_code
ec) mutable {
+ const auto new_atomic_idempotent = rpc.request().atomic_idempotent;
+ const auto old_atomic_idempotent =
rpc.response().old_atomic_idempotent;
+
+ zauto_write_lock l(_lock);
+
+ CHECK_EQ_MSG(ec,
+ ERR_OK,
+ "An error that cannot be handled occurred while updating
atomic_idempotent "
+ "on remote: error_code={}, app_name={}, app_id={}, "
+ "old_atomic_idempotent={}, new_atomic_idempotent={}",
+ ec,
+ app->app_name,
+ app->app_id,
+ old_atomic_idempotent,
+ new_atomic_idempotent);
+
+ CHECK_EQ_MSG(rpc.request().app_name,
+ app->app_name,
+ "atomic_idempotent was updated to remote storage, however
app_name "
+ "has been changed since then: old_app_name={},
new_app_name={}, "
+ "app_id={}, old_atomic_idempotent={},
new_atomic_idempotent={}",
+ rpc.request().app_name,
+ app->app_name,
+ app->app_id,
+ old_atomic_idempotent,
+ new_atomic_idempotent);
+
+ // No need to check `app->__isset.atomic_idempotent`, since by default
it is true
+ // (because `app->atomic_idempotent` has default value false).
+ CHECK_EQ_MSG(old_atomic_idempotent,
+ app->atomic_idempotent,
+ "atomic_idempotent has been updated to remote storage,
however "
+ "old_atomic_idempotent from response is not consistent
with current local "
+ "atomic_idempotent: app_name={}, app_id={},
old_atomic_idempotent={}, "
+ "local_atomic_idempotent={}, new_atomic_idempotent={}",
+ app->app_name,
+ app->app_id,
+ old_atomic_idempotent,
+ app->atomic_idempotent,
+ new_atomic_idempotent);
+
+ app->__set_atomic_idempotent(new_atomic_idempotent);
+ LOG_INFO("both remote and local app-level atomic_idempotent have been
updated "
+ "successfully: app_name={}, app_id={},
old_atomic_idempotent={}, "
+ "new_atomic_idempotent={}",
+ app->app_name,
+ app->app_id,
+ old_atomic_idempotent,
+ new_atomic_idempotent);
+
+ auto &response = rpc.response();
+ response.err = ERR_OK;
+ });
+}
+
#undef SUCC_CREATE_APP_RESPONSE
#undef FAIL_CREATE_APP_RESPONSE
#undef INIT_CREATE_APP_RESPONSE_WITH_OK
diff --git a/src/meta/server_state.h b/src/meta/server_state.h
index 56c7d6c03..141c57aa8 100644
--- a/src/meta/server_state.h
+++ b/src/meta/server_state.h
@@ -41,7 +41,6 @@
#include "app_env_validator.h"
#include "common/gpid.h"
#include "common/manual_compact.h"
-#include "dsn.layer2_types.h"
#include "gutil/map_util.h"
#include "meta/meta_rpc_types.h"
#include "meta_data.h"
@@ -52,10 +51,14 @@
#include "utils/zlocks.h"
namespace dsn {
+class app_info;
class blob;
class command_deregister;
class host_port;
class message_ex;
+class partition_configuration;
+class query_cfg_request;
+class query_cfg_response;
namespace replication {
class configuration_balancer_request;
@@ -200,6 +203,12 @@ public:
void set_max_replica_count(configuration_set_max_replica_count_rpc rpc);
void recover_from_max_replica_count_env();
+ // Get `atomic_idempotent` of a table.
+ void get_atomic_idempotent(configuration_get_atomic_idempotent_rpc rpc)
const;
+
+ // Set `atomic_idempotent` of a table.
+ void set_atomic_idempotent(configuration_set_atomic_idempotent_rpc rpc);
+
// return true if no need to do any actions
bool check_all_partitions();
void get_cluster_balance_score(double &primary_stddev /*out*/, double
&total_stddev /*out*/);
@@ -377,6 +386,14 @@ private:
int32_t max_replica_count,
dsn::task_tracker &tracker);
+ // Update `atomic_idempotent` of given table on remote storage.
+ //
+ // Parameters:
+ // - app: the given table.
+ // - rpc: RPC request/response to change `atomic_idempotent`.
+ void update_app_atomic_idempotent_on_remote(std::shared_ptr<app_state>
&app,
+
configuration_set_atomic_idempotent_rpc rpc);
+
// Used for `on_start_manual_compaction`
bool parse_compaction_envs(start_manual_compact_rpc rpc,
std::vector<std::string> &keys,
@@ -385,19 +402,6 @@ private:
const
std::vector<std::string> &keys,
const
std::vector<std::string> &values);
- bool app_info_compatible_equal(const app_info &l, const app_info &r) const
- {
- if (l.status != r.status || l.app_type != r.app_type || l.app_name !=
r.app_name ||
- l.app_id != r.app_id || l.partition_count != r.partition_count ||
- l.is_stateful != r.is_stateful || l.max_replica_count !=
r.max_replica_count ||
- l.expire_second != r.expire_second || l.create_second !=
r.create_second ||
- l.drop_second != r.drop_second) {
- return false;
- }
- return true;
- }
-
-private:
friend class bulk_load_service;
friend class bulk_load_service_test;
friend class meta_app_operation_test;
diff --git a/src/meta/test/meta_app_operation_test.cpp
b/src/meta/test/meta_app_operation_test.cpp
index 9956ec31e..13f7f4291 100644
--- a/src/meta/test/meta_app_operation_test.cpp
+++ b/src/meta/test/meta_app_operation_test.cpp
@@ -34,6 +34,7 @@
#include "common/replication.codes.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
+#include "gutil/map_util.h"
#include "meta/meta_data.h"
#include "meta/meta_rpc_types.h"
#include "meta/meta_service.h"
@@ -66,11 +67,12 @@ class meta_app_operation_test : public meta_test_base
public:
meta_app_operation_test() = default;
- error_code create_app_test(int32_t partition_count,
+ error_code create_app_test(const std::string &app_name,
+ int32_t partition_count,
int32_t replica_count,
bool success_if_exist,
- const std::string &app_name,
- const std::map<std::string, std::string> &envs
= {})
+ const std::map<std::string, std::string> &envs,
+ bool atomic_idempotent)
{
configuration_create_app_request create_request;
configuration_create_app_response create_response;
@@ -81,6 +83,7 @@ public:
create_request.options.success_if_exist = success_if_exist;
create_request.options.is_stateful = true;
create_request.options.envs = envs;
+ create_request.options.__set_atomic_idempotent(atomic_idempotent);
auto result = fake_create_app(_ss.get(), create_request);
fake_wait_rpc(result, create_response);
@@ -171,16 +174,13 @@ public:
app->envs[replica_envs::UPDATE_MAX_REPLICA_COUNT] = env;
}
- // set remote env of app
- auto app_path = _ss->get_app_path(*app);
- auto ainfo = *(reinterpret_cast<app_info *>(app.get()));
- auto json_config = dsn::json::json_forwarder<app_info>::encode(ainfo);
+ // Set env on remote storage.
dsn::task_tracker tracker;
_ms->get_remote_storage()->set_data(
- app_path,
- json_config,
+ _ss->get_app_path(*app),
+ json::json_forwarder<app_info>::encode(*app),
LPC_META_STATE_HIGH,
- [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); },
+ [](error_code ec) { ASSERT_EQ(ERR_OK, ec); },
&tracker);
tracker.wait_outstanding_tasks();
}
@@ -219,38 +219,31 @@ public:
auto app = find_app(app_name);
CHECK(app, "app({}) does not exist", app_name);
- auto partition_size = static_cast<int>(app->pcs.size());
- for (int i = 0; i < partition_size; ++i) {
- // set local max_replica_count of each partition
- auto &pc = app->pcs[i];
+ for (auto &pc : app->pcs) {
+ // Set `max_replica_count` of each partition locally.
pc.max_replica_count = max_replica_count;
- // set remote max_replica_count of each partition
- auto partition_path = _ss->get_partition_path(pc.pid);
- auto json_config =
dsn::json::json_forwarder<partition_configuration>::encode(pc);
+ // Set `max_replica_count` of each partition on remote storage.
dsn::task_tracker tracker;
_ms->get_remote_storage()->set_data(
- partition_path,
- json_config,
+ _ss->get_partition_path(pc.pid),
+ json::json_forwarder<partition_configuration>::encode(pc),
LPC_META_STATE_HIGH,
- [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); },
+ [](error_code ec) { ASSERT_EQ(ec, ERR_OK); },
&tracker);
tracker.wait_outstanding_tasks();
}
- // set local max_replica_count of app
+ // Set `max_replica_count` of the table locally.
app->max_replica_count = max_replica_count;
- // set remote max_replica_count of app
- auto app_path = _ss->get_app_path(*app);
- auto ainfo = *(reinterpret_cast<app_info *>(app.get()));
- auto json_config = dsn::json::json_forwarder<app_info>::encode(ainfo);
+ // Set `max_replica_count` of the table on remote storage.
dsn::task_tracker tracker;
_ms->get_remote_storage()->set_data(
- app_path,
- json_config,
+ _ss->get_app_path(*app),
+ json::json_forwarder<app_info>::encode(*app),
LPC_META_STATE_HIGH,
- [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); },
+ [](error_code ec) { ASSERT_EQ(ec, ERR_OK); },
&tracker);
tracker.wait_outstanding_tasks();
}
@@ -261,27 +254,24 @@ public:
auto app = find_app(app_name);
CHECK(app, "app({}) does not exist", app_name);
- auto partition_size = static_cast<int>(app->pcs.size());
- for (int i = 0; i < partition_size; ++i) {
- // verify local max_replica_count of each partition
- auto &pc = app->pcs[i];
- ASSERT_EQ(pc.max_replica_count, expected_max_replica_count);
+ for (auto &pc : app->pcs) {
+ // Verify `max_replica_count` of each partition locally.
+ ASSERT_EQ(expected_max_replica_count, pc.max_replica_count);
- // verify remote max_replica_count of each partition
- auto partition_path = _ss->get_partition_path(pc.pid);
+ // Verify `max_replica_count` of each partition on remote storage.
dsn::task_tracker tracker;
_ms->get_remote_storage()->get_data(
- partition_path,
+ _ss->get_partition_path(pc.pid),
LPC_META_CALLBACK,
[expected_pid = pc.pid, expected_max_replica_count](error_code
ec,
const blob
&value) {
- ASSERT_EQ(ec, ERR_OK);
+ ASSERT_EQ(ERR_OK, ec);
partition_configuration pc;
dsn::json::json_forwarder<partition_configuration>::decode(value, pc);
- ASSERT_EQ(pc.pid, expected_pid);
- ASSERT_EQ(pc.max_replica_count,
expected_max_replica_count);
+ ASSERT_EQ(expected_pid, pc.pid);
+ ASSERT_EQ(expected_max_replica_count,
pc.max_replica_count);
},
&tracker);
tracker.wait_outstanding_tasks();
@@ -294,29 +284,30 @@ public:
auto app = find_app(app_name);
CHECK(app, "app({}) does not exist", app_name);
- // verify local max_replica_count of the app
- ASSERT_EQ(app->max_replica_count, expected_max_replica_count);
- // env of max_replica_count should have been removed under normal
circumstances
- ASSERT_EQ(app->envs.find(replica_envs::UPDATE_MAX_REPLICA_COUNT),
app->envs.end());
+ // Verify `max_replica_count` of the table locally.
+ ASSERT_EQ(expected_max_replica_count, app->max_replica_count);
+ // The env of `max_replica_count` should have been removed under
normal circumstances.
+ ASSERT_FALSE(gutil::ContainsKey(app->envs,
replica_envs::UPDATE_MAX_REPLICA_COUNT));
- // verify remote max_replica_count of the app
- auto app_path = _ss->get_app_path(*app);
+ // Verify `max_replica_count` of the table on remote storage.
dsn::task_tracker tracker;
_ms->get_remote_storage()->get_data(
- app_path,
+ _ss->get_app_path(*app),
LPC_META_CALLBACK,
[app, expected_max_replica_count](error_code ec, const blob
&value) {
ASSERT_EQ(ec, ERR_OK);
app_info ainfo;
- dsn::json::json_forwarder<app_info>::decode(value, ainfo);
+ json::json_forwarder<app_info>::decode(value, ainfo);
+
+ ASSERT_EQ(app->app_name, ainfo.app_name);
+ ASSERT_EQ(app->app_id, ainfo.app_id);
+ ASSERT_EQ(expected_max_replica_count, ainfo.max_replica_count);
- ASSERT_EQ(ainfo.app_name, app->app_name);
- ASSERT_EQ(ainfo.app_id, app->app_id);
- ASSERT_EQ(ainfo.max_replica_count, expected_max_replica_count);
- // env of max_replica_count should have been removed under
normal circumstances
-
ASSERT_EQ(ainfo.envs.find(replica_envs::UPDATE_MAX_REPLICA_COUNT),
- ainfo.envs.end());
+ // The env of `max_replica_count` should have been removed
under normal
+ // circumstances.
+ ASSERT_FALSE(
+ gutil::ContainsKey(ainfo.envs,
replica_envs::UPDATE_MAX_REPLICA_COUNT));
},
&tracker);
tracker.wait_outstanding_tasks();
@@ -328,17 +319,15 @@ public:
auto app = find_app(app_name);
CHECK(app, "app({}) does not exist", app_name);
- auto app_path = _ss->get_app_path(*app);
-
dsn::task_tracker tracker;
_ms->get_remote_storage()->get_data(
- app_path,
+ _ss->get_app_path(*app),
LPC_META_CALLBACK,
[app_name, expected_envs, app](error_code ec, const blob &value) {
ASSERT_EQ(ERR_OK, ec);
app_info ainfo;
- dsn::json::json_forwarder<app_info>::decode(value, ainfo);
+ json::json_forwarder<app_info>::decode(value, ainfo);
ASSERT_EQ(app_name, app->app_name);
ASSERT_EQ(app_name, ainfo.app_name);
@@ -350,10 +339,92 @@ public:
tracker.wait_outstanding_tasks();
}
+ void verify_app_atomic_idempotent(const std::string &app_name, bool
expected_atomic_idempotent)
+ {
+ const auto app = find_app(app_name);
+ CHECK(app, "app({}) does not exist", app_name);
+
+ // `app->__isset.atomic_idempotent` must be true since by default it
is true
+ // (because `app->atomic_idempotent` has default value false).
+ ASSERT_TRUE(app->__isset.atomic_idempotent);
+
+ // Verify `atomic_idempotent` of the table locally.
+ ASSERT_EQ(expected_atomic_idempotent, app->atomic_idempotent);
+
+ dsn::task_tracker tracker;
+ _ms->get_remote_storage()->get_data(
+ _ss->get_app_path(*app),
+ LPC_META_CALLBACK,
+ [app, expected_atomic_idempotent](error_code ec, const blob
&value) {
+ ASSERT_EQ(ERR_OK, ec);
+
+ app_info ainfo;
+ dsn::json::json_forwarder<app_info>::decode(value, ainfo);
+
+ ASSERT_EQ(app->app_name, ainfo.app_name);
+ ASSERT_EQ(app->app_id, ainfo.app_id);
+
+ // `ainfo.__isset.atomic_idempotent` must be true since by
default it
+ // is true (because `ainfo.atomic_idempotent` has default
value false).
+ ASSERT_TRUE(ainfo.__isset.atomic_idempotent);
+
+ // Verify `atomic_idempotent` on remote storage.
+ ASSERT_EQ(expected_atomic_idempotent, ainfo.atomic_idempotent);
+ },
+ &tracker);
+ tracker.wait_outstanding_tasks();
+ }
+
+ configuration_get_atomic_idempotent_response get_atomic_idempotent(const
std::string &app_name)
+ {
+ auto req =
std::make_unique<configuration_get_atomic_idempotent_request>();
+ req->__set_app_name(app_name);
+
+ configuration_get_atomic_idempotent_rpc rpc(std::move(req),
RPC_CM_GET_ATOMIC_IDEMPOTENT);
+ _ss->get_atomic_idempotent(rpc);
+ _ss->wait_all_task();
+
+ return rpc.response();
+ }
+
+ void test_get_atomic_idempotent(const std::string &app_name, bool
expected_atomic_idempotent)
+ {
+ const auto resp = get_atomic_idempotent(app_name);
+ ASSERT_EQ(ERR_OK, resp.err);
+ ASSERT_EQ(expected_atomic_idempotent, resp.atomic_idempotent);
+ }
+
+ configuration_set_atomic_idempotent_response set_atomic_idempotent(const
std::string &app_name,
+ bool
atomic_idempotent)
+ {
+ auto req =
std::make_unique<configuration_set_atomic_idempotent_request>();
+ req->__set_app_name(app_name);
+ req->__set_atomic_idempotent(atomic_idempotent);
+
+ configuration_set_atomic_idempotent_rpc rpc(std::move(req),
RPC_CM_SET_ATOMIC_IDEMPOTENT);
+ _ss->set_atomic_idempotent(rpc);
+ _ss->wait_all_task();
+
+ return rpc.response();
+ }
+
+ void test_set_atomic_idempotent(const std::string &app_name,
+ bool expected_new_atomic_idempotent,
+ bool expected_old_atomic_idempotent)
+ {
+ const auto resp = set_atomic_idempotent(app_name,
expected_new_atomic_idempotent);
+ ASSERT_EQ(ERR_OK, resp.err);
+ ASSERT_EQ(expected_old_atomic_idempotent, resp.old_atomic_idempotent);
+
+ // Ensure that new `atomic_idempotent` has been updated to
`server_state`.
+ test_get_atomic_idempotent(app_name, expected_new_atomic_idempotent);
+ }
+
const std::string APP_NAME = "app_operation_test";
const std::string OLD_APP_NAME = "old_app_operation";
const std::string DUP_MASTER_APP_NAME = "dup_master_test";
const std::string DUP_FOLLOWER_APP_NAME = "dup_follower_test";
+ const std::string ATOMIC_IDEMPOTENT_APP_NAME = "atomic_idempotent_test";
};
TEST_F(meta_app_operation_test, create_app)
@@ -371,6 +442,7 @@ TEST_F(meta_app_operation_test, create_app)
app_status::type before_status;
error_code expected_err;
std::map<std::string, std::string> envs = {};
+ bool atomic_idempotent = false;
} tests[] = {
// Wrong partition_count (< 0).
{APP_NAME, -1, 3, 2, 3, 1, false, app_status::AS_INVALID,
ERR_INVALID_PARAMETERS},
@@ -661,6 +733,18 @@ TEST_F(meta_app_operation_test, create_app)
{duplication_constants::kEnvMasterMetasKey, "10.1.2.3:34601"},
{duplication_constants::kEnvMasterAppNameKey, DUP_MASTER_APP_NAME},
{duplication_constants::kEnvFollowerAppStatusKey,
"invalid_creating_status"}}},
+ // Create a table with idempotence enabled for atomic writes.
+ {ATOMIC_IDEMPOTENT_APP_NAME,
+ 4,
+ 3,
+ 2,
+ 3,
+ 3,
+ false,
+ app_status::AS_INVALID,
+ ERR_OK,
+ {},
+ true},
};
clear_nodes();
@@ -706,15 +790,17 @@ TEST_F(meta_app_operation_test, create_app)
update_app_status(test.before_status);
}
- auto err = create_app_test(test.partition_count,
- test.replica_count,
- test.success_if_exist,
- test.app_name,
- test.envs);
+ const auto err = create_app_test(test.app_name,
+ test.partition_count,
+ test.replica_count,
+ test.success_if_exist,
+ test.envs,
+ test.atomic_idempotent);
ASSERT_EQ(test.expected_err, err);
if (test.expected_err == ERR_OK) {
verify_app_envs(test.app_name, test.envs);
+ verify_app_atomic_idempotent(test.app_name,
test.atomic_idempotent);
}
_ms->set_node_state(nodes, true);
@@ -728,10 +814,10 @@ TEST_F(meta_app_operation_test, create_app)
all_test_envs.insert(test.envs.begin(), test.envs.end());
}
for (const auto &option : replica_envs::ROCKSDB_DYNAMIC_OPTIONS) {
- ASSERT_TRUE(all_test_envs.find(option) != all_test_envs.end());
+ ASSERT_TRUE(gutil::ContainsKey(all_test_envs, option));
}
for (const auto &option : replica_envs::ROCKSDB_STATIC_OPTIONS) {
- ASSERT_TRUE(all_test_envs.find(option) != all_test_envs.end());
+ ASSERT_TRUE(gutil::ContainsKey(all_test_envs, option));
}
}
@@ -897,8 +983,8 @@ TEST_F(meta_app_operation_test, get_max_replica_count)
}
const auto resp = get_max_replica_count(test.app_name);
- ASSERT_EQ(resp.err, test.expected_err);
- ASSERT_EQ(resp.max_replica_count, test.expected_max_replica_count);
+ ASSERT_EQ(test.expected_err, resp.err);
+ ASSERT_EQ(test.expected_max_replica_count, resp.max_replica_count);
recover_partition_max_replica_count();
}
@@ -1037,8 +1123,8 @@ TEST_F(meta_app_operation_test, set_max_replica_count)
test.initial_max_replica_count);
const auto resp = get_max_replica_count(test.app_name);
- ASSERT_EQ(resp.err, ERR_OK);
- ASSERT_EQ(resp.max_replica_count, test.initial_max_replica_count);
+ ASSERT_EQ(ERR_OK, resp.err);
+ ASSERT_EQ(test.initial_max_replica_count, resp.max_replica_count);
}
// recover automatically the original
FLAGS_min_live_node_count_for_unfreeze,
@@ -1078,13 +1164,13 @@ TEST_F(meta_app_operation_test, set_max_replica_count)
const auto get_resp = get_max_replica_count(test.app_name);
if (test.expected_err == ERR_APP_NOT_EXIST || test.expected_err ==
ERR_INCONSISTENT_STATE) {
- ASSERT_EQ(get_resp.err, test.expected_err);
+ ASSERT_EQ(test.expected_err, get_resp.err);
} else if (test.expected_err != ERR_OK) {
- ASSERT_EQ(get_resp.err, ERR_OK);
+ ASSERT_EQ(ERR_OK, get_resp.err);
}
if (test.expected_err != ERR_OK) {
- ASSERT_EQ(get_resp.max_replica_count,
test.expected_old_max_replica_count);
+ ASSERT_EQ(test.expected_old_max_replica_count,
get_resp.max_replica_count);
}
_ms->set_node_state(nodes, true);
@@ -1106,6 +1192,20 @@ TEST_F(meta_app_operation_test,
recover_from_max_replica_count_env)
verify_app_max_replica_count(APP_NAME, new_max_replica_count);
}
+TEST_F(meta_app_operation_test, change_atomic_idempotent)
+{
+ create_app(APP_NAME, 4);
+
+ // Initial `atomic_idempotent` should be false by default.
+ test_get_atomic_idempotent(APP_NAME, false);
+
+ // Enable `atomic_idempotent` and its previous value should be false.
+ test_set_atomic_idempotent(APP_NAME, true, false);
+
+ // Disable `atomic_idempotent` and its previous value should be true.
+ test_set_atomic_idempotent(APP_NAME, false, true);
+}
+
TEST_F(meta_app_operation_test, rename_app)
{
const std::string app_name_1 = APP_NAME + "_rename_1";
diff --git a/src/meta/test/state_sync_test.cpp
b/src/meta/test/state_sync_test.cpp
index d440c00dc..289d01a9c 100644
--- a/src/meta/test/state_sync_test.cpp
+++ b/src/meta/test/state_sync_test.cpp
@@ -25,6 +25,8 @@
*/
#include <boost/lexical_cast.hpp>
+#include <fmt/core.h>
+
#include <algorithm>
#include <cstdint>
#include <fstream> // IWYU pragma: keep
@@ -151,10 +153,15 @@ void meta_service_test_app::state_sync_test()
info.is_stateful = true;
info.app_id = i;
info.app_type = "simple_kv";
- info.app_name = "test_app" + boost::lexical_cast<std::string>(i);
+ info.app_name = fmt::format("test_app{}", i);
info.max_replica_count = 3;
- info.partition_count = random32(100, 10000);
+ info.partition_count = static_cast<int32_t>(random32(100, 10000));
info.status = dsn::app_status::AS_CREATING;
+
+ // `atomic_idempotent` will be set true for the table with even
index,
+ // otherwise false.
+ info.atomic_idempotent = (static_cast<uint32_t>(i) & 1U) == 0;
+
std::shared_ptr<app_state> app = app_state::create(info);
ss->_all_apps.emplace(app->app_id, app);
@@ -174,8 +181,8 @@ void meta_service_test_app::state_sync_test()
}
}
- dsn::error_code ec = ss->sync_apps_to_remote_storage();
- ASSERT_EQ(ec, dsn::ERR_OK);
+ error_code ec = ss->sync_apps_to_remote_storage();
+ ASSERT_EQ(ERR_OK, ec);
ss->spin_wait_staging();
}
@@ -184,11 +191,20 @@ void meta_service_test_app::state_sync_test()
{
std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
ss2->initialize(svc, apps_root);
- dsn::error_code ec = ss2->sync_apps_from_remote_storage();
- ASSERT_EQ(ec, dsn::ERR_OK);
+ error_code ec = ss2->sync_apps_from_remote_storage();
+ ASSERT_EQ(ERR_OK, ec);
for (int i = 1; i <= apps_count; ++i) {
std::shared_ptr<app_state> app = ss2->get_app(i);
+
+ // `app->__isset.atomic_idempotent` must be true since by default
it is true
+ // (because `app->atomic_idempotent` has default value false).
+ ASSERT_TRUE(app->__isset.atomic_idempotent);
+
+ // Recovered `app->atomic_idempotent` will be true for the table
with even
+ // index, otherwise false.
+ ASSERT_EQ((static_cast<uint32_t>(i) & 1U) == 0,
app->atomic_idempotent);
+
for (int j = 0; j < app->partition_count; ++j) {
config_context &cc = app->helpers->contexts[j];
ASSERT_EQ(1, cc.dropped.size());
@@ -196,7 +212,7 @@ void meta_service_test_app::state_sync_test()
}
}
ec = ss2->dump_from_remote_storage("meta_state.dump1", false);
- ASSERT_EQ(ec, dsn::ERR_OK);
+ ASSERT_EQ(ERR_OK, ec);
}
// dump another way
diff --git a/src/ranger/ranger_resource_policy_manager.cpp
b/src/ranger/ranger_resource_policy_manager.cpp
index d0ac9362e..61d733949 100644
--- a/src/ranger/ranger_resource_policy_manager.cpp
+++ b/src/ranger/ranger_resource_policy_manager.cpp
@@ -180,7 +180,8 @@
ranger_resource_policy_manager::ranger_resource_policy_manager(
"RPC_CM_QUERY_PARTITION_SPLIT",
"RPC_CM_QUERY_BULK_LOAD_STATUS",
"RPC_CM_QUERY_MANUAL_COMPACT_STATUS",
- "RPC_CM_GET_MAX_REPLICA_COUNT"},
+ "RPC_CM_GET_MAX_REPLICA_COUNT",
+ "RPC_CM_GET_ATOMIC_IDEMPOTENT"},
_ac_type_of_database_rpcs);
// DATABASE - kControl
register_rpc_access_type(access_type::kControl,
@@ -198,6 +199,7 @@
ranger_resource_policy_manager::ranger_resource_policy_manager(
"RPC_CM_CLEAR_BULK_LOAD",
"RPC_CM_START_MANUAL_COMPACT",
"RPC_CM_SET_MAX_REPLICA_COUNT",
+ "RPC_CM_SET_ATOMIC_IDEMPOTENT",
"RPC_CM_RENAME_APP"},
_ac_type_of_database_rpcs);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]