This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 76cd79896 feat(make_idempotent): introduce new table-level APIs to 
meta servers to list/control idempotence for atomic writes (#2205)
76cd79896 is described below

commit 76cd7989690ec17e790283d8f3ba5b1e985e9e56
Author: Dan Wang <[email protected]>
AuthorDate: Tue Mar 18 17:56:36 2025 +0800

    feat(make_idempotent): introduce new table-level APIs to meta servers to 
list/control idempotence for atomic writes (#2205)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    We support enabling the idempotence of atomic write operations at the 
**meta**
    level.
    
    First, we introduce `atomic_idempotent` as **an attribute** of the 
`app_info` structure,
    which represents the basic properties of a table. This attribute is also 
persisted to
    remote storage and will be loaded from remote storage when the meta server 
starts.
    
    As for the **interface**, we implement the following on the meta server:
    - Support specifying this attribute when creating a table.
    - List API will return the entire `app_info` object with 
`atomic_idempotent` for each
    table.
    - Support getting/setting this attribute for any table.
---
 idl/dsn.layer2.thrift                         |   5 +
 idl/meta_admin.thrift                         |  35 ++++
 src/client/replication_ddl_client.cpp         |  19 ++
 src/client/replication_ddl_client.h           |   6 +
 src/common/json_helper.h                      |  91 ++++++++--
 src/common/replication.codes.h                |   2 +
 src/meta/meta_rpc_types.h                     |  91 +++++-----
 src/meta/meta_service.cpp                     |  32 ++++
 src/meta/meta_service.h                       |   6 +
 src/meta/server_state.cpp                     | 246 +++++++++++++++++++++++---
 src/meta/server_state.h                       |  32 ++--
 src/meta/test/meta_app_operation_test.cpp     | 242 +++++++++++++++++--------
 src/meta/test/state_sync_test.cpp             |  30 +++-
 src/ranger/ranger_resource_policy_manager.cpp |   4 +-
 14 files changed, 669 insertions(+), 172 deletions(-)

diff --git a/idl/dsn.layer2.thrift b/idl/dsn.layer2.thrift
index 47d736cb8..5a016f9bb 100644
--- a/idl/dsn.layer2.thrift
+++ b/idl/dsn.layer2.thrift
@@ -113,4 +113,9 @@ struct app_info
     // New fields for bulk load
     // Whether this app is executing bulk load
     14:optional bool    is_bulk_loading = false;
+
+    // Whether all atomic writes to this table are made idempotent:
+    // - true: made idempotent.
+    // - false: kept non-idempotent as their respective client requests. 
+    15:optional bool    atomic_idempotent = false;
 }
diff --git a/idl/meta_admin.thrift b/idl/meta_admin.thrift
index b5621c30b..457a21784 100644
--- a/idl/meta_admin.thrift
+++ b/idl/meta_admin.thrift
@@ -139,6 +139,11 @@ struct create_app_options
     4:string           app_type;
     5:bool             is_stateful;
     6:map<string, string>  envs;
+
+    // Whether all atomic writes to this table are made idempotent:
+    // - true: made idempotent.
+    // - false: kept non-idempotent as their respective client requests. 
+    7:optional bool    atomic_idempotent = false;
 }
 
 struct configuration_create_app_request
@@ -437,6 +442,36 @@ struct configuration_set_max_replica_count_response
     3:string                    hint_message;
 }
 
+// Get the idempotence (see app_info::atomic_idempotent) of given table for 
atomic writes.
+struct configuration_get_atomic_idempotent_request
+{
+    1:string                    app_name;
+}
+
+struct configuration_get_atomic_idempotent_response
+{
+    1:dsn.error_code            err;
+    2:bool                      atomic_idempotent;
+    3:string                    hint_message;
+}
+
+// Change the idempotence (see app_info::atomic_idempotent) of given table for 
atomic writes.
+struct configuration_set_atomic_idempotent_request
+{
+    1:string                    app_name;
+    2:bool                      atomic_idempotent;
+}
+
+struct configuration_set_atomic_idempotent_response
+{
+    1:dsn.error_code            err;
+
+    // Previous atomic_idempotent before updated.
+    2:bool                      old_atomic_idempotent;
+
+    3:string                    hint_message;
+}
+
 // ONLY FOR GO
 // A client to MetaServer's administration API.
 service admin_client
diff --git a/src/client/replication_ddl_client.cpp 
b/src/client/replication_ddl_client.cpp
index a5928389a..1e12b9f11 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -1674,6 +1674,25 @@ replication_ddl_client::set_max_replica_count(const 
std::string &app_name,
         configuration_set_max_replica_count_rpc(std::move(req), 
RPC_CM_SET_MAX_REPLICA_COUNT));
 }
 
+error_with<configuration_get_atomic_idempotent_response>
+replication_ddl_client::get_atomic_idempotent(const std::string &app_name)
+{
+    auto req = std::make_unique<configuration_get_atomic_idempotent_request>();
+    req->__set_app_name(app_name);
+    return call_rpc_sync(
+        configuration_get_atomic_idempotent_rpc(std::move(req), 
RPC_CM_GET_ATOMIC_IDEMPOTENT));
+}
+
+error_with<configuration_set_atomic_idempotent_response>
+replication_ddl_client::set_atomic_idempotent(const std::string &app_name, 
bool atomic_idempotent)
+{
+    auto req = std::make_unique<configuration_set_atomic_idempotent_request>();
+    req->__set_app_name(app_name);
+    req->__set_atomic_idempotent(atomic_idempotent);
+    return call_rpc_sync(
+        configuration_set_atomic_idempotent_rpc(std::move(req), 
RPC_CM_SET_ATOMIC_IDEMPOTENT));
+}
+
 error_with<configuration_rename_app_response>
 replication_ddl_client::rename_app(const std::string &old_app_name, const 
std::string &new_app_name)
 {
diff --git a/src/client/replication_ddl_client.h 
b/src/client/replication_ddl_client.h
index 0ddd58144..e83530481 100644
--- a/src/client/replication_ddl_client.h
+++ b/src/client/replication_ddl_client.h
@@ -299,6 +299,12 @@ public:
     error_with<configuration_set_max_replica_count_response>
     set_max_replica_count(const std::string &app_name, int32_t 
max_replica_count);
 
+    error_with<configuration_get_atomic_idempotent_response>
+    get_atomic_idempotent(const std::string &app_name);
+
+    error_with<configuration_set_atomic_idempotent_response>
+    set_atomic_idempotent(const std::string &app_name, bool atomic_idempotent);
+
     void set_max_wait_app_ready_secs(uint32_t max_wait_secs) { _max_wait_secs 
= max_wait_secs; }
     void set_meta_servers_leader();
 
diff --git a/src/common/json_helper.h b/src/common/json_helper.h
index d56bcecba..ace421433 100644
--- a/src/common/json_helper.h
+++ b/src/common/json_helper.h
@@ -104,6 +104,26 @@
     out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14)  
                    \
     JSON_ENCODE_ENTRIES13(out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13);    \
     JSON_ENCODE_ENTRY(out, prefix, T14)
+#define JSON_ENCODE_ENTRIES15(                                                 
                    \
+    out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15)                 \
+    JSON_ENCODE_ENTRIES14(                                                     
                    \
+        out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14);                 \
+    JSON_ENCODE_ENTRY(out, prefix, T15)
+#define JSON_ENCODE_ENTRIES16(                                                 
                    \
+    out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15, T16)            \
+    JSON_ENCODE_ENTRIES15(                                                     
                    \
+        out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15);            \
+    JSON_ENCODE_ENTRY(out, prefix, T16)
+#define JSON_ENCODE_ENTRIES17(                                                 
                    \
+    out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15, T16, T17)       \
+    JSON_ENCODE_ENTRIES16(                                                     
                    \
+        out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16);       \
+    JSON_ENCODE_ENTRY(out, prefix, T17)
+#define JSON_ENCODE_ENTRIES18(                                                 
                    \
+    out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15, T16, T17, T18)  \
+    JSON_ENCODE_ENTRIES17(                                                     
                    \
+        out, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17);  \
+    JSON_ENCODE_ENTRY(out, prefix, T18)
 
 #define JSON_DECODE_ENTRY(in, prefix, T)                                       
                    \
     do {                                                                       
                    \
@@ -162,9 +182,47 @@
     in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14)   
                    \
     JSON_DECODE_ENTRIES13(in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, 
T11, T12, T13);     \
     JSON_TRY_DECODE_ENTRY(in, prefix, T14)
-
-#define JSON_ENTRIES_GET_MACRO(                                                
                    \
-    ph1, ph2, ph3, ph4, ph5, ph6, ph7, ph8, ph9, ph10, ph11, ph12, ph13, ph14, 
NAME, ...)          \
+#define JSON_DECODE_ENTRIES15(                                                 
                    \
+    in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15)                  \
+    JSON_DECODE_ENTRIES14(                                                     
                    \
+        in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14);                  \
+    JSON_TRY_DECODE_ENTRY(in, prefix, T15)
+#define JSON_DECODE_ENTRIES16(                                                 
                    \
+    in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15, T16)             \
+    JSON_DECODE_ENTRIES15(                                                     
                    \
+        in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15);             \
+    JSON_TRY_DECODE_ENTRY(in, prefix, T16)
+#define JSON_DECODE_ENTRIES17(                                                 
                    \
+    in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15, T16, T17)        \
+    JSON_DECODE_ENTRIES16(                                                     
                    \
+        in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16);        \
+    JSON_TRY_DECODE_ENTRY(in, prefix, T17)
+#define JSON_DECODE_ENTRIES18(                                                 
                    \
+    in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, 
T15, T16, T17, T18)   \
+    JSON_DECODE_ENTRIES17(                                                     
                    \
+        in, prefix, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, 
T14, T15, T16, T17);   \
+    JSON_TRY_DECODE_ENTRY(in, prefix, T18)
+
+#define JSON_ENTRIES_GET_MACRO(ph1,                                            
                    \
+                               ph2,                                            
                    \
+                               ph3,                                            
                    \
+                               ph4,                                            
                    \
+                               ph5,                                            
                    \
+                               ph6,                                            
                    \
+                               ph7,                                            
                    \
+                               ph8,                                            
                    \
+                               ph9,                                            
                    \
+                               ph10,                                           
                    \
+                               ph11,                                           
                    \
+                               ph12,                                           
                    \
+                               ph13,                                           
                    \
+                               ph14,                                           
                    \
+                               ph15,                                           
                    \
+                               ph16,                                           
                    \
+                               ph17,                                           
                    \
+                               ph18,                                           
                    \
+                               NAME,                                           
                    \
+                               ...)                                            
                    \
     NAME
 // workaround due to the way VC handles "..."
 #define JSON_ENTRIES_GET_MACRO_(tuple) JSON_ENTRIES_GET_MACRO tuple
@@ -172,6 +230,10 @@
 #define JSON_ENCODE_ENTRIES(out, prefix, ...)                                  
                    \
     out.StartObject();                                                         
                    \
     JSON_ENTRIES_GET_MACRO_((__VA_ARGS__,                                      
                    \
+                             JSON_ENCODE_ENTRIES18,                            
                    \
+                             JSON_ENCODE_ENTRIES17,                            
                    \
+                             JSON_ENCODE_ENTRIES16,                            
                    \
+                             JSON_ENCODE_ENTRIES15,                            
                    \
                              JSON_ENCODE_ENTRIES14,                            
                    \
                              JSON_ENCODE_ENTRIES13,                            
                    \
                              JSON_ENCODE_ENTRIES12,                            
                    \
@@ -194,6 +256,10 @@
     int arguments_count = 0;                                                   
                    \
     int parsed_count = 0;                                                      
                    \
     JSON_ENTRIES_GET_MACRO_((__VA_ARGS__,                                      
                    \
+                             JSON_DECODE_ENTRIES18,                            
                    \
+                             JSON_DECODE_ENTRIES17,                            
                    \
+                             JSON_DECODE_ENTRIES16,                            
                    \
+                             JSON_DECODE_ENTRIES15,                            
                    \
                              JSON_DECODE_ENTRIES14,                            
                    \
                              JSON_DECODE_ENTRIES13,                            
                    \
                              JSON_DECODE_ENTRIES12,                            
                    \
@@ -448,14 +514,14 @@ inline bool json_decode(const dsn::json::JsonObject &in, 
dsn::host_port &hp)
     return static_cast<bool>(hp);
 }
 
-inline void json_encode(JsonWriter &out, const dsn::partition_configuration 
&pc);
-inline bool json_decode(const JsonObject &in, dsn::partition_configuration 
&pc);
-inline void json_encode(JsonWriter &out, const dsn::app_info &info);
-inline bool json_decode(const JsonObject &in, dsn::app_info &info);
-inline void json_encode(JsonWriter &out, const dsn::replication::file_meta 
&f_meta);
-inline bool json_decode(const JsonObject &in, dsn::replication::file_meta 
&f_meta);
-inline void json_encode(JsonWriter &out, const 
dsn::replication::bulk_load_metadata &metadata);
-inline bool json_decode(const JsonObject &in, 
dsn::replication::bulk_load_metadata &metadata);
+inline void json_encode(JsonWriter &, const dsn::partition_configuration &);
+inline bool json_decode(const JsonObject &, dsn::partition_configuration &);
+inline void json_encode(JsonWriter &, const dsn::app_info &);
+inline bool json_decode(const JsonObject &, dsn::app_info &);
+inline void json_encode(JsonWriter &, const dsn::replication::file_meta &);
+inline bool json_decode(const JsonObject &, dsn::replication::file_meta &);
+inline void json_encode(JsonWriter &, const 
dsn::replication::bulk_load_metadata &);
+inline bool json_decode(const JsonObject &in, 
dsn::replication::bulk_load_metadata &);
 
 template <typename T>
 inline void json_encode_iterable(JsonWriter &out, const T &t)
@@ -740,7 +806,8 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::app_info,
                               drop_second,
                               duplicating,
                               init_partition_count,
-                              is_bulk_loading)
+                              is_bulk_loading,
+                              atomic_idempotent)
 
 NON_MEMBER_JSON_SERIALIZATION(dsn::replication::file_meta, name, size, md5)
 
diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h
index 42ae695f7..28d846eea 100644
--- a/src/common/replication.codes.h
+++ b/src/common/replication.codes.h
@@ -131,6 +131,8 @@ MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, 
TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE_RPC(RPC_CM_GET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE_RPC(RPC_CM_SET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
+MAKE_EVENT_CODE_RPC(RPC_CM_GET_ATOMIC_IDEMPOTENT, TASK_PRIORITY_COMMON)
+MAKE_EVENT_CODE_RPC(RPC_CM_SET_ATOMIC_IDEMPOTENT, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE(LPC_USE_RANGER_ACCESS_CONTROL, TASK_PRIORITY_COMMON)
 #undef CURRENT_THREAD_POOL
 
diff --git a/src/meta/meta_rpc_types.h b/src/meta/meta_rpc_types.h
index 200b409e1..b6002ac83 100644
--- a/src/meta/meta_rpc_types.h
+++ b/src/meta/meta_rpc_types.h
@@ -26,49 +26,52 @@
 #include "replica_admin_types.h"
 #include "rpc/rpc_holder.h"
 
-namespace dsn {
-namespace replication {
+namespace dsn::replication {
 
-typedef rpc_holder<configuration_update_app_env_request, 
configuration_update_app_env_response>
-    app_env_rpc;
-typedef rpc_holder<ddd_diagnose_request, ddd_diagnose_response> 
ddd_diagnose_rpc;
-typedef rpc_holder<configuration_query_by_node_request, 
configuration_query_by_node_response>
-    configuration_query_by_node_rpc;
-typedef rpc_holder<query_cfg_request, query_cfg_response> 
configuration_query_by_index_rpc;
-typedef rpc_holder<configuration_list_apps_request, 
configuration_list_apps_response>
-    configuration_list_apps_rpc;
-typedef rpc_holder<configuration_list_nodes_request, 
configuration_list_nodes_response>
-    configuration_list_nodes_rpc;
-typedef rpc_holder<configuration_cluster_info_request, 
configuration_cluster_info_response>
-    configuration_cluster_info_rpc;
-typedef rpc_holder<configuration_balancer_request, 
configuration_balancer_response>
-    configuration_balancer_rpc;
-typedef rpc_holder<configuration_meta_control_request, 
configuration_meta_control_response>
-    configuration_meta_control_rpc;
-typedef rpc_holder<configuration_recovery_request, 
configuration_recovery_response>
-    configuration_recovery_rpc;
-typedef rpc_holder<configuration_report_restore_status_request,
-                   configuration_report_restore_status_response>
-    configuration_report_restore_status_rpc;
-typedef rpc_holder<configuration_query_restore_request, 
configuration_query_restore_response>
-    configuration_query_restore_rpc;
-typedef rpc_holder<configuration_query_backup_policy_request,
-                   configuration_query_backup_policy_response>
-    query_backup_policy_rpc;
-typedef rpc_holder<configuration_modify_backup_policy_request,
-                   configuration_modify_backup_policy_response>
-    configuration_modify_backup_policy_rpc;
-typedef rpc_holder<start_backup_app_request, start_backup_app_response> 
start_backup_app_rpc;
-typedef rpc_holder<query_backup_status_request, query_backup_status_response>
-    query_backup_status_rpc;
-typedef rpc_holder<configuration_get_max_replica_count_request,
-                   configuration_get_max_replica_count_response>
-    configuration_get_max_replica_count_rpc;
-typedef rpc_holder<configuration_set_max_replica_count_request,
-                   configuration_set_max_replica_count_response>
-    configuration_set_max_replica_count_rpc;
-typedef rpc_holder<configuration_rename_app_request, 
configuration_rename_app_response>
-    configuration_rename_app_rpc;
+using app_env_rpc =
+    rpc_holder<configuration_update_app_env_request, 
configuration_update_app_env_response>;
+using ddd_diagnose_rpc = rpc_holder<ddd_diagnose_request, 
ddd_diagnose_response>;
+using configuration_query_by_node_rpc =
+    rpc_holder<configuration_query_by_node_request, 
configuration_query_by_node_response>;
+using configuration_query_by_index_rpc = rpc_holder<query_cfg_request, 
query_cfg_response>;
+using configuration_list_apps_rpc =
+    rpc_holder<configuration_list_apps_request, 
configuration_list_apps_response>;
+using configuration_list_nodes_rpc =
+    rpc_holder<configuration_list_nodes_request, 
configuration_list_nodes_response>;
+using configuration_cluster_info_rpc =
+    rpc_holder<configuration_cluster_info_request, 
configuration_cluster_info_response>;
+using configuration_balancer_rpc =
+    rpc_holder<configuration_balancer_request, 
configuration_balancer_response>;
+using configuration_meta_control_rpc =
+    rpc_holder<configuration_meta_control_request, 
configuration_meta_control_response>;
+using configuration_recovery_rpc =
+    rpc_holder<configuration_recovery_request, 
configuration_recovery_response>;
+using configuration_report_restore_status_rpc =
+    rpc_holder<configuration_report_restore_status_request,
+               configuration_report_restore_status_response>;
+using configuration_query_restore_rpc =
+    rpc_holder<configuration_query_restore_request, 
configuration_query_restore_response>;
+using query_backup_policy_rpc = 
rpc_holder<configuration_query_backup_policy_request,
+                                           
configuration_query_backup_policy_response>;
+using configuration_modify_backup_policy_rpc =
+    rpc_holder<configuration_modify_backup_policy_request,
+               configuration_modify_backup_policy_response>;
+using start_backup_app_rpc = rpc_holder<start_backup_app_request, 
start_backup_app_response>;
+using query_backup_status_rpc =
+    rpc_holder<query_backup_status_request, query_backup_status_response>;
+using configuration_get_max_replica_count_rpc =
+    rpc_holder<configuration_get_max_replica_count_request,
+               configuration_get_max_replica_count_response>;
+using configuration_set_max_replica_count_rpc =
+    rpc_holder<configuration_set_max_replica_count_request,
+               configuration_set_max_replica_count_response>;
+using configuration_get_atomic_idempotent_rpc =
+    rpc_holder<configuration_get_atomic_idempotent_request,
+               configuration_get_atomic_idempotent_response>;
+using configuration_set_atomic_idempotent_rpc =
+    rpc_holder<configuration_set_atomic_idempotent_request,
+               configuration_set_atomic_idempotent_response>;
+using configuration_rename_app_rpc =
+    rpc_holder<configuration_rename_app_request, 
configuration_rename_app_response>;
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 6ec19c007..4be52d90e 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -563,6 +563,12 @@ void meta_service::register_rpc_handlers()
     register_rpc_handler_with_rpc_holder(RPC_CM_SET_MAX_REPLICA_COUNT,
                                          "set_max_replica_count",
                                          
&meta_service::on_set_max_replica_count);
+    register_rpc_handler_with_rpc_holder(RPC_CM_GET_ATOMIC_IDEMPOTENT,
+                                         "get_atomic_idempotent",
+                                         
&meta_service::on_get_atomic_idempotent);
+    register_rpc_handler_with_rpc_holder(RPC_CM_SET_ATOMIC_IDEMPOTENT,
+                                         "set_atomic_idempotent",
+                                         
&meta_service::on_set_atomic_idempotent);
 }
 
 meta_leader_state meta_service::check_leader(dsn::message_ex *req, 
dsn::host_port *forward_address)
@@ -1353,5 +1359,31 @@ void 
meta_service::on_set_max_replica_count(configuration_set_max_replica_count_
                      server_state::sStateHash);
 }
 
+// ThreadPool: THREAD_POOL_META_SERVER
+void 
meta_service::on_get_atomic_idempotent(configuration_get_atomic_idempotent_rpc 
rpc)
+{
+    if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) {
+        return;
+    }
+
+    tasking::enqueue(LPC_META_STATE_NORMAL,
+                     tracker(),
+                     std::bind(&server_state::get_atomic_idempotent, 
_state.get(), rpc),
+                     server_state::sStateHash);
+}
+
+// ThreadPool: THREAD_POOL_META_SERVER
+void 
meta_service::on_set_atomic_idempotent(configuration_set_atomic_idempotent_rpc 
rpc)
+{
+    if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) {
+        return;
+    }
+
+    tasking::enqueue(LPC_META_STATE_NORMAL,
+                     tracker(),
+                     std::bind(&server_state::set_atomic_idempotent, 
_state.get(), rpc),
+                     server_state::sStateHash);
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h
index 5230fbfd4..9b9488481 100644
--- a/src/meta/meta_service.h
+++ b/src/meta/meta_service.h
@@ -289,6 +289,12 @@ private:
     void on_get_max_replica_count(configuration_get_max_replica_count_rpc rpc);
     void on_set_max_replica_count(configuration_set_max_replica_count_rpc rpc);
 
+    // Get `atomic_idempotent` of a table.
+    void on_get_atomic_idempotent(configuration_get_atomic_idempotent_rpc rpc);
+
+    // Set `atomic_idempotent` of a table.
+    void on_set_atomic_idempotent(configuration_set_atomic_idempotent_rpc rpc);
+
     // if return 'kNotLeaderAndCannotForwardRpc' and 'forward_address' != 
nullptr, then return
     // leader by 'forward_address'.
     meta_leader_state check_leader(dsn::message_ex *req, dsn::host_port 
*forward_address);
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 72557a8c5..823e4fe28 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -51,6 +51,7 @@
 #include "common/replication_common.h"
 #include "common/replication_enums.h"
 #include "common/replication_other_types.h"
+#include "dsn.layer2_types.h"
 #include "dump_file.h"
 #include "meta/app_env_validator.h"
 #include "meta/meta_data.h"
@@ -1213,6 +1214,10 @@ void server_state::create_app(dsn::message_ex *msg)
     info.create_second = static_cast<int64_t>(dsn_now_s());
     info.init_partition_count = request.options.partition_count;
 
+    // No need to check `request.options.__isset.atomic_idempotent`, since by 
default
+    // it is true (because `request.options.atomic_idempotent` has default 
value false).
+    info.__set_atomic_idempotent(request.options.atomic_idempotent);
+
     app = app_state::create(info);
     app->helpers->pending_response = msg;
     app->helpers->partitions_in_progress.store(info.partition_count);
@@ -2358,20 +2363,50 @@ void server_state::on_propose_balancer(const 
configuration_balancer_request &req
     _meta_svc->get_balancer()->register_proposals({&_all_apps, &_nodes}, 
request, response);
 }
 
+namespace {
+
+bool app_info_compatible_equal(const app_info &l, const app_info &r)
+{
+    // Some fields like `app_type`, `app_id` and `create_second` are 
initialized and
+    // persisted into .app-info file when the replica is created, and will 
NEVER be
+    // changed during their lifetime even if the table is dropped or recalled. 
Their
+    // consistency must be checked.
+    //
+    // Some fields may be updated during their lifetime, but will NEVER be 
persisted
+    // into .app-info, such as most environments in `envs`. Their consistency 
do not
+    // need to be checked.
+    //
+    // Some fields may be updated during their lifetime, and will also be 
persited into
+    // .app-info file:
+    // - For the fields such as `app_name`, `max_replica_count` and 
`atomic_idempotent`
+    // without compatibility problems, their consistency should be checked.
+    // - For the fields such as `duplicating` whose compatibility varies 
between primary
+    // and secondaries in 2.1.x, 2.2.x and 2.3.x release, their consistency 
are not
+    // checked.
+    return l.status == r.status && l.app_type == r.app_type && l.app_name == 
r.app_name &&
+           l.app_id == r.app_id && l.partition_count == r.partition_count &&
+           l.is_stateful == r.is_stateful && l.max_replica_count == 
r.max_replica_count &&
+           l.expire_second == r.expire_second && l.create_second == 
r.create_second &&
+           l.drop_second == r.drop_second && l.atomic_idempotent == 
r.atomic_idempotent;
+}
+
+} // anonymous namespace
+
 error_code
 server_state::construct_apps(const std::vector<query_app_info_response> 
&query_app_responses,
                              const std::vector<dsn::host_port> &replica_nodes,
                              std::string &hint_message)
 {
     int max_app_id = 0;
-    for (unsigned int i = 0; i < query_app_responses.size(); ++i) {
-        query_app_info_response query_resp = query_app_responses[i];
-        if (query_resp.err != dsn::ERR_OK)
+    for (size_t i = 0; i < query_app_responses.size(); ++i) {
+        const auto &query_resp = query_app_responses[i];
+        if (query_resp.err != dsn::ERR_OK) {
             continue;
+        }
 
         for (const app_info &info : query_resp.apps) {
-            CHECK_GE_MSG(info.app_id, 1, "invalid app id");
-            auto iter = _all_apps.find(info.app_id);
+            CHECK_GT_MSG(info.app_id, 0, "invalid app id");
+            const auto iter = std::as_const(_all_apps).find(info.app_id);
             if (iter == _all_apps.end()) {
                 std::shared_ptr<app_state> app = app_state::create(info);
                 LOG_INFO("create app info from ({}) for id({}): {}",
@@ -2380,23 +2415,20 @@ server_state::construct_apps(const 
std::vector<query_app_info_response> &query_a
                          boost::lexical_cast<std::string>(info));
                 _all_apps.emplace(app->app_id, app);
                 max_app_id = std::max(app->app_id, max_app_id);
-            } else {
-                app_info *old_info = iter->second.get();
-                // all info in all replica servers should be the same
-                // coz the app info is only initialized when the replica is
-                // created, and it will NEVER change even if the app is 
dropped/recalled...
-                if (info != *old_info) // app_info::operator !=
-                {
-                    // compatible for app.duplicating different between 
primary and secondaries in
-                    // 2.1.x, 2.2.x and 2.3.x release
-                    CHECK(app_info_compatible_equal(info, *old_info),
-                          "conflict app info from ({}) for id({}): 
new_info({}), old_info({})",
-                          replica_nodes[i],
-                          info.app_id,
-                          boost::lexical_cast<std::string>(info),
-                          boost::lexical_cast<std::string>(*old_info));
-                }
+                continue;
             }
+
+            app_info *old_info = iter->second.get();
+            if (info == *old_info) {
+                continue;
+            }
+
+            CHECK(app_info_compatible_equal(info, *old_info),
+                  "conflict app info from ({}) for id({}): new_info({}), 
old_info({})",
+                  replica_nodes[i],
+                  info.app_id,
+                  boost::lexical_cast<std::string>(info),
+                  boost::lexical_cast<std::string>(*old_info));
         }
     }
 
@@ -3648,7 +3680,7 @@ std::shared_ptr<app_state> 
server_state::get_app_and_check_exist(const std::stri
                                                                  Response 
&response) const
 {
     auto app = get_app(app_name);
-    if (app == nullptr) {
+    if (!app) {
         response.err = ERR_APP_NOT_EXIST;
         response.hint_message = fmt::format("app({}) does not exist", 
app_name);
     }
@@ -3756,7 +3788,7 @@ void 
server_state::set_max_replica_count(configuration_set_max_replica_count_rpc
         response.old_max_replica_count = app->max_replica_count;
 
         if (app->status != app_status::AS_AVAILABLE) {
-            response.err = ERR_INVALID_PARAMETERS;
+            response.err = ERR_INVALID_STATE;
             response.hint_message = fmt::format("app({}) is not in available 
status", app_name);
             LOG_ERROR("failed to set max_replica_count: app_name={}, 
app_id={}, error_code={}, "
                       "hint_message={}",
@@ -4458,6 +4490,174 @@ void 
server_state::recover_app_max_replica_count(std::shared_ptr<app_state> &app
         &tracker);
 }
 
+// ThreadPool: THREAD_POOL_META_STATE
+void 
server_state::get_atomic_idempotent(configuration_get_atomic_idempotent_rpc 
rpc) const
+{
+    const auto &app_name = rpc.request().app_name;
+    auto &response = rpc.response();
+
+    zauto_read_lock l(_lock);
+
+    auto app = get_app_and_check_exist(app_name, response);
+    if (!app) {
+        response.atomic_idempotent = false;
+        LOG_WARNING("failed to get atomic_idempotent: app_name={}, "
+                    "error_code={}, hint_message={}",
+                    app_name,
+                    response.err,
+                    response.hint_message);
+        return;
+    }
+
+    response.err = ERR_OK;
+
+    // No need to check `app->__isset.atomic_idempotent`, since by default it 
is true
+    // (because `app->atomic_idempotent` has default value false).
+    response.atomic_idempotent = app->atomic_idempotent;
+
+    LOG_INFO("get atomic_idempotent successfully: app_name={}, app_id={}, "
+             "atomic_idempotent={}",
+             app_name,
+             app->app_id,
+             response.atomic_idempotent);
+}
+
+// ThreadPool: THREAD_POOL_META_STATE
+void 
server_state::set_atomic_idempotent(configuration_set_atomic_idempotent_rpc rpc)
+{
+    const auto &app_name = rpc.request().app_name;
+    const auto new_atomic_idempotent = rpc.request().atomic_idempotent;
+    auto &response = rpc.response();
+
+    int32_t app_id = 0;
+    std::shared_ptr<app_state> app;
+
+    {
+        zauto_read_lock l(_lock);
+
+        app = get_app_and_check_exist(app_name, response);
+        if (!app) {
+            response.old_atomic_idempotent = false;
+            LOG_WARNING("failed to set atomic_idempotent: app_name={}, "
+                        "error_code={}, hint_message={}",
+                        app_name,
+                        response.err,
+                        response.hint_message);
+            return;
+        }
+
+        app_id = app->app_id;
+
+        // No need to check `app->__isset.atomic_idempotent`, since by default 
it is true
+        // (because `app->atomic_idempotent` has default value false).
+        response.old_atomic_idempotent = app->atomic_idempotent;
+
+        if (app->status != app_status::AS_AVAILABLE) {
+            response.err = ERR_INVALID_STATE;
+            response.hint_message = fmt::format("app({}) is not in available 
status", app_name);
+            LOG_ERROR("failed to set atomic_idempotent: app_name={}, 
app_id={}, "
+                      "error_code={}, hint_message={}",
+                      app_name,
+                      app_id,
+                      response.err,
+                      response.hint_message);
+            return;
+        }
+    }
+
+    const auto level = _meta_svc->get_function_level();
+    if (level <= meta_function_level::fl_freezed) {
+        response.err = ERR_STATE_FREEZED;
+        response.hint_message =
+            "current meta function level is freezed, since there are too few 
alive nodes";
+        LOG_ERROR("failed to set atomic_idempotent: app_name={}, app_id={}, "
+                  "error_code={}, message={}",
+                  app_name,
+                  app_id,
+                  response.err,
+                  response.hint_message);
+        return;
+    }
+
+    if (new_atomic_idempotent == response.old_atomic_idempotent) {
+        response.err = ERR_OK;
+        response.hint_message = "no need to update atomic_idempotent since 
it's unchanged";
+        LOG_WARNING("{}: app_name={}, app_id={}", response.hint_message, 
app_name, app_id);
+        return;
+    }
+
+    LOG_INFO("request for updating atomic_idempotent: app_name={}, app_id={}, "
+             "old_atomic_idempotent={}, new_atomic_idempotent={}",
+             app_name,
+             app_id,
+             response.old_atomic_idempotent,
+             new_atomic_idempotent);
+
+    update_app_atomic_idempotent_on_remote(app, rpc);
+}
+
+// ThreadPool: THREAD_POOL_META_STATE
+void server_state::update_app_atomic_idempotent_on_remote(
+    std::shared_ptr<app_state> &app, configuration_set_atomic_idempotent_rpc 
rpc)
+{
+    app_info ainfo = *app;
+    ainfo.atomic_idempotent = rpc.request().atomic_idempotent;
+    do_update_app_info(get_app_path(*app), ainfo, [this, app, rpc](error_code 
ec) mutable {
+        const auto new_atomic_idempotent = rpc.request().atomic_idempotent;
+        const auto old_atomic_idempotent = 
rpc.response().old_atomic_idempotent;
+
+        zauto_write_lock l(_lock);
+
+        CHECK_EQ_MSG(ec,
+                     ERR_OK,
+                     "An error that cannot be handled occurred while updating 
atomic_idempotent "
+                     "on remote: error_code={}, app_name={}, app_id={}, "
+                     "old_atomic_idempotent={}, new_atomic_idempotent={}",
+                     ec,
+                     app->app_name,
+                     app->app_id,
+                     old_atomic_idempotent,
+                     new_atomic_idempotent);
+
+        CHECK_EQ_MSG(rpc.request().app_name,
+                     app->app_name,
+                     "atomic_idempotent was updated to remote storage, however 
app_name "
+                     "has been changed since then: old_app_name={}, 
new_app_name={}, "
+                     "app_id={}, old_atomic_idempotent={}, 
new_atomic_idempotent={}",
+                     rpc.request().app_name,
+                     app->app_name,
+                     app->app_id,
+                     old_atomic_idempotent,
+                     new_atomic_idempotent);
+
+        // No need to check `app->__isset.atomic_idempotent`, since by default 
it is true
+        // (because `app->atomic_idempotent` has default value false).
+        CHECK_EQ_MSG(old_atomic_idempotent,
+                     app->atomic_idempotent,
+                     "atomic_idempotent has been updated to remote storage, 
however "
+                     "old_atomic_idempotent from response is not consistent 
with current local "
+                     "atomic_idempotent: app_name={}, app_id={}, 
old_atomic_idempotent={}, "
+                     "local_atomic_idempotent={}, new_atomic_idempotent={}",
+                     app->app_name,
+                     app->app_id,
+                     old_atomic_idempotent,
+                     app->atomic_idempotent,
+                     new_atomic_idempotent);
+
+        app->__set_atomic_idempotent(new_atomic_idempotent);
+        LOG_INFO("both remote and local app-level atomic_idempotent have been 
updated "
+                 "successfully: app_name={}, app_id={}, 
old_atomic_idempotent={}, "
+                 "new_atomic_idempotent={}",
+                 app->app_name,
+                 app->app_id,
+                 old_atomic_idempotent,
+                 new_atomic_idempotent);
+
+        auto &response = rpc.response();
+        response.err = ERR_OK;
+    });
+}
+
 #undef SUCC_CREATE_APP_RESPONSE
 #undef FAIL_CREATE_APP_RESPONSE
 #undef INIT_CREATE_APP_RESPONSE_WITH_OK
diff --git a/src/meta/server_state.h b/src/meta/server_state.h
index 56c7d6c03..141c57aa8 100644
--- a/src/meta/server_state.h
+++ b/src/meta/server_state.h
@@ -41,7 +41,6 @@
 #include "app_env_validator.h"
 #include "common/gpid.h"
 #include "common/manual_compact.h"
-#include "dsn.layer2_types.h"
 #include "gutil/map_util.h"
 #include "meta/meta_rpc_types.h"
 #include "meta_data.h"
@@ -52,10 +51,14 @@
 #include "utils/zlocks.h"
 
 namespace dsn {
+class app_info;
 class blob;
 class command_deregister;
 class host_port;
 class message_ex;
+class partition_configuration;
+class query_cfg_request;
+class query_cfg_response;
 
 namespace replication {
 class configuration_balancer_request;
@@ -200,6 +203,12 @@ public:
     void set_max_replica_count(configuration_set_max_replica_count_rpc rpc);
     void recover_from_max_replica_count_env();
 
+    // Get `atomic_idempotent` of a table.
+    void get_atomic_idempotent(configuration_get_atomic_idempotent_rpc rpc) 
const;
+
+    // Set `atomic_idempotent` of a table.
+    void set_atomic_idempotent(configuration_set_atomic_idempotent_rpc rpc);
+
     // return true if no need to do any actions
     bool check_all_partitions();
     void get_cluster_balance_score(double &primary_stddev /*out*/, double 
&total_stddev /*out*/);
@@ -377,6 +386,14 @@ private:
                                        int32_t max_replica_count,
                                        dsn::task_tracker &tracker);
 
+    // Update `atomic_idempotent` of given table on remote storage.
+    //
+    // Parameters:
+    // - app: the given table.
+    // - rpc: RPC request/response to change `atomic_idempotent`.
+    void update_app_atomic_idempotent_on_remote(std::shared_ptr<app_state> 
&app,
+                                                
configuration_set_atomic_idempotent_rpc rpc);
+
     // Used for `on_start_manual_compaction`
     bool parse_compaction_envs(start_manual_compact_rpc rpc,
                                std::vector<std::string> &keys,
@@ -385,19 +402,6 @@ private:
                                                   const 
std::vector<std::string> &keys,
                                                   const 
std::vector<std::string> &values);
 
-    bool app_info_compatible_equal(const app_info &l, const app_info &r) const
-    {
-        if (l.status != r.status || l.app_type != r.app_type || l.app_name != 
r.app_name ||
-            l.app_id != r.app_id || l.partition_count != r.partition_count ||
-            l.is_stateful != r.is_stateful || l.max_replica_count != 
r.max_replica_count ||
-            l.expire_second != r.expire_second || l.create_second != 
r.create_second ||
-            l.drop_second != r.drop_second) {
-            return false;
-        }
-        return true;
-    }
-
-private:
     friend class bulk_load_service;
     friend class bulk_load_service_test;
     friend class meta_app_operation_test;
diff --git a/src/meta/test/meta_app_operation_test.cpp 
b/src/meta/test/meta_app_operation_test.cpp
index 9956ec31e..13f7f4291 100644
--- a/src/meta/test/meta_app_operation_test.cpp
+++ b/src/meta/test/meta_app_operation_test.cpp
@@ -34,6 +34,7 @@
 #include "common/replication.codes.h"
 #include "dsn.layer2_types.h"
 #include "gtest/gtest.h"
+#include "gutil/map_util.h"
 #include "meta/meta_data.h"
 #include "meta/meta_rpc_types.h"
 #include "meta/meta_service.h"
@@ -66,11 +67,12 @@ class meta_app_operation_test : public meta_test_base
 public:
     meta_app_operation_test() = default;
 
-    error_code create_app_test(int32_t partition_count,
+    error_code create_app_test(const std::string &app_name,
+                               int32_t partition_count,
                                int32_t replica_count,
                                bool success_if_exist,
-                               const std::string &app_name,
-                               const std::map<std::string, std::string> &envs 
= {})
+                               const std::map<std::string, std::string> &envs,
+                               bool atomic_idempotent)
     {
         configuration_create_app_request create_request;
         configuration_create_app_response create_response;
@@ -81,6 +83,7 @@ public:
         create_request.options.success_if_exist = success_if_exist;
         create_request.options.is_stateful = true;
         create_request.options.envs = envs;
+        create_request.options.__set_atomic_idempotent(atomic_idempotent);
 
         auto result = fake_create_app(_ss.get(), create_request);
         fake_wait_rpc(result, create_response);
@@ -171,16 +174,13 @@ public:
             app->envs[replica_envs::UPDATE_MAX_REPLICA_COUNT] = env;
         }
 
-        // set remote env of app
-        auto app_path = _ss->get_app_path(*app);
-        auto ainfo = *(reinterpret_cast<app_info *>(app.get()));
-        auto json_config = dsn::json::json_forwarder<app_info>::encode(ainfo);
+        // Set env on remote storage.
         dsn::task_tracker tracker;
         _ms->get_remote_storage()->set_data(
-            app_path,
-            json_config,
+            _ss->get_app_path(*app),
+            json::json_forwarder<app_info>::encode(*app),
             LPC_META_STATE_HIGH,
-            [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); },
+            [](error_code ec) { ASSERT_EQ(ERR_OK, ec); },
             &tracker);
         tracker.wait_outstanding_tasks();
     }
@@ -219,38 +219,31 @@ public:
         auto app = find_app(app_name);
         CHECK(app, "app({}) does not exist", app_name);
 
-        auto partition_size = static_cast<int>(app->pcs.size());
-        for (int i = 0; i < partition_size; ++i) {
-            // set local max_replica_count of each partition
-            auto &pc = app->pcs[i];
+        for (auto &pc : app->pcs) {
+            // Set `max_replica_count` of each partition locally.
             pc.max_replica_count = max_replica_count;
 
-            // set remote max_replica_count of each partition
-            auto partition_path = _ss->get_partition_path(pc.pid);
-            auto json_config = 
dsn::json::json_forwarder<partition_configuration>::encode(pc);
+            // Set `max_replica_count` of each partition on remote storage.
             dsn::task_tracker tracker;
             _ms->get_remote_storage()->set_data(
-                partition_path,
-                json_config,
+                _ss->get_partition_path(pc.pid),
+                json::json_forwarder<partition_configuration>::encode(pc),
                 LPC_META_STATE_HIGH,
-                [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); },
+                [](error_code ec) { ASSERT_EQ(ec, ERR_OK); },
                 &tracker);
             tracker.wait_outstanding_tasks();
         }
 
-        // set local max_replica_count of app
+        // Set `max_replica_count` of the table locally.
         app->max_replica_count = max_replica_count;
 
-        // set remote max_replica_count of app
-        auto app_path = _ss->get_app_path(*app);
-        auto ainfo = *(reinterpret_cast<app_info *>(app.get()));
-        auto json_config = dsn::json::json_forwarder<app_info>::encode(ainfo);
+        // Set `max_replica_count` of the table on remote storage.
         dsn::task_tracker tracker;
         _ms->get_remote_storage()->set_data(
-            app_path,
-            json_config,
+            _ss->get_app_path(*app),
+            json::json_forwarder<app_info>::encode(*app),
             LPC_META_STATE_HIGH,
-            [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); },
+            [](error_code ec) { ASSERT_EQ(ec, ERR_OK); },
             &tracker);
         tracker.wait_outstanding_tasks();
     }
@@ -261,27 +254,24 @@ public:
         auto app = find_app(app_name);
         CHECK(app, "app({}) does not exist", app_name);
 
-        auto partition_size = static_cast<int>(app->pcs.size());
-        for (int i = 0; i < partition_size; ++i) {
-            // verify local max_replica_count of each partition
-            auto &pc = app->pcs[i];
-            ASSERT_EQ(pc.max_replica_count, expected_max_replica_count);
+        for (auto &pc : app->pcs) {
+            // Verify `max_replica_count` of each partition locally.
+            ASSERT_EQ(expected_max_replica_count, pc.max_replica_count);
 
-            // verify remote max_replica_count of each partition
-            auto partition_path = _ss->get_partition_path(pc.pid);
+            // Verify `max_replica_count` of each partition on remote storage.
             dsn::task_tracker tracker;
             _ms->get_remote_storage()->get_data(
-                partition_path,
+                _ss->get_partition_path(pc.pid),
                 LPC_META_CALLBACK,
                 [expected_pid = pc.pid, expected_max_replica_count](error_code 
ec,
                                                                     const blob 
&value) {
-                    ASSERT_EQ(ec, ERR_OK);
+                    ASSERT_EQ(ERR_OK, ec);
 
                     partition_configuration pc;
                     
dsn::json::json_forwarder<partition_configuration>::decode(value, pc);
 
-                    ASSERT_EQ(pc.pid, expected_pid);
-                    ASSERT_EQ(pc.max_replica_count, 
expected_max_replica_count);
+                    ASSERT_EQ(expected_pid, pc.pid);
+                    ASSERT_EQ(expected_max_replica_count, 
pc.max_replica_count);
                 },
                 &tracker);
             tracker.wait_outstanding_tasks();
@@ -294,29 +284,30 @@ public:
         auto app = find_app(app_name);
         CHECK(app, "app({}) does not exist", app_name);
 
-        // verify local max_replica_count of the app
-        ASSERT_EQ(app->max_replica_count, expected_max_replica_count);
-        // env of max_replica_count should have been removed under normal 
circumstances
-        ASSERT_EQ(app->envs.find(replica_envs::UPDATE_MAX_REPLICA_COUNT), 
app->envs.end());
+        // Verify `max_replica_count` of the table locally.
+        ASSERT_EQ(expected_max_replica_count, app->max_replica_count);
+        // The env of `max_replica_count` should have been removed under 
normal circumstances.
+        ASSERT_FALSE(gutil::ContainsKey(app->envs, 
replica_envs::UPDATE_MAX_REPLICA_COUNT));
 
-        // verify remote max_replica_count of the app
-        auto app_path = _ss->get_app_path(*app);
+        // Verify `max_replica_count` of the table on remote storage.
         dsn::task_tracker tracker;
         _ms->get_remote_storage()->get_data(
-            app_path,
+            _ss->get_app_path(*app),
             LPC_META_CALLBACK,
             [app, expected_max_replica_count](error_code ec, const blob 
&value) {
                 ASSERT_EQ(ec, ERR_OK);
 
                 app_info ainfo;
-                dsn::json::json_forwarder<app_info>::decode(value, ainfo);
+                json::json_forwarder<app_info>::decode(value, ainfo);
+
+                ASSERT_EQ(app->app_name, ainfo.app_name);
+                ASSERT_EQ(app->app_id, ainfo.app_id);
+                ASSERT_EQ(expected_max_replica_count, ainfo.max_replica_count);
 
-                ASSERT_EQ(ainfo.app_name, app->app_name);
-                ASSERT_EQ(ainfo.app_id, app->app_id);
-                ASSERT_EQ(ainfo.max_replica_count, expected_max_replica_count);
-                // env of max_replica_count should have been removed under 
normal circumstances
-                
ASSERT_EQ(ainfo.envs.find(replica_envs::UPDATE_MAX_REPLICA_COUNT),
-                          ainfo.envs.end());
+                // The env of `max_replica_count` should have been removed 
under normal
+                // circumstances.
+                ASSERT_FALSE(
+                    gutil::ContainsKey(ainfo.envs, 
replica_envs::UPDATE_MAX_REPLICA_COUNT));
             },
             &tracker);
         tracker.wait_outstanding_tasks();
@@ -328,17 +319,15 @@ public:
         auto app = find_app(app_name);
         CHECK(app, "app({}) does not exist", app_name);
 
-        auto app_path = _ss->get_app_path(*app);
-
         dsn::task_tracker tracker;
         _ms->get_remote_storage()->get_data(
-            app_path,
+            _ss->get_app_path(*app),
             LPC_META_CALLBACK,
             [app_name, expected_envs, app](error_code ec, const blob &value) {
                 ASSERT_EQ(ERR_OK, ec);
 
                 app_info ainfo;
-                dsn::json::json_forwarder<app_info>::decode(value, ainfo);
+                json::json_forwarder<app_info>::decode(value, ainfo);
 
                 ASSERT_EQ(app_name, app->app_name);
                 ASSERT_EQ(app_name, ainfo.app_name);
@@ -350,10 +339,92 @@ public:
         tracker.wait_outstanding_tasks();
     }
 
+    void verify_app_atomic_idempotent(const std::string &app_name, bool 
expected_atomic_idempotent)
+    {
+        const auto app = find_app(app_name);
+        CHECK(app, "app({}) does not exist", app_name);
+
+        // `app->__isset.atomic_idempotent` must be true since by default it 
is true
+        // (because `app->atomic_idempotent` has default value false).
+        ASSERT_TRUE(app->__isset.atomic_idempotent);
+
+        // Verify `atomic_idempotent` of the table locally.
+        ASSERT_EQ(expected_atomic_idempotent, app->atomic_idempotent);
+
+        dsn::task_tracker tracker;
+        _ms->get_remote_storage()->get_data(
+            _ss->get_app_path(*app),
+            LPC_META_CALLBACK,
+            [app, expected_atomic_idempotent](error_code ec, const blob 
&value) {
+                ASSERT_EQ(ERR_OK, ec);
+
+                app_info ainfo;
+                dsn::json::json_forwarder<app_info>::decode(value, ainfo);
+
+                ASSERT_EQ(app->app_name, ainfo.app_name);
+                ASSERT_EQ(app->app_id, ainfo.app_id);
+
+                // `ainfo.__isset.atomic_idempotent` must be true since by 
default it
+                // is true (because `ainfo.atomic_idempotent` has default 
value false).
+                ASSERT_TRUE(ainfo.__isset.atomic_idempotent);
+
+                // Verify `atomic_idempotent` on remote storage.
+                ASSERT_EQ(expected_atomic_idempotent, ainfo.atomic_idempotent);
+            },
+            &tracker);
+        tracker.wait_outstanding_tasks();
+    }
+
+    configuration_get_atomic_idempotent_response get_atomic_idempotent(const 
std::string &app_name)
+    {
+        auto req = 
std::make_unique<configuration_get_atomic_idempotent_request>();
+        req->__set_app_name(app_name);
+
+        configuration_get_atomic_idempotent_rpc rpc(std::move(req), 
RPC_CM_GET_ATOMIC_IDEMPOTENT);
+        _ss->get_atomic_idempotent(rpc);
+        _ss->wait_all_task();
+
+        return rpc.response();
+    }
+
+    void test_get_atomic_idempotent(const std::string &app_name, bool 
expected_atomic_idempotent)
+    {
+        const auto resp = get_atomic_idempotent(app_name);
+        ASSERT_EQ(ERR_OK, resp.err);
+        ASSERT_EQ(expected_atomic_idempotent, resp.atomic_idempotent);
+    }
+
+    configuration_set_atomic_idempotent_response set_atomic_idempotent(const 
std::string &app_name,
+                                                                       bool 
atomic_idempotent)
+    {
+        auto req = 
std::make_unique<configuration_set_atomic_idempotent_request>();
+        req->__set_app_name(app_name);
+        req->__set_atomic_idempotent(atomic_idempotent);
+
+        configuration_set_atomic_idempotent_rpc rpc(std::move(req), 
RPC_CM_SET_ATOMIC_IDEMPOTENT);
+        _ss->set_atomic_idempotent(rpc);
+        _ss->wait_all_task();
+
+        return rpc.response();
+    }
+
+    void test_set_atomic_idempotent(const std::string &app_name,
+                                    bool expected_new_atomic_idempotent,
+                                    bool expected_old_atomic_idempotent)
+    {
+        const auto resp = set_atomic_idempotent(app_name, 
expected_new_atomic_idempotent);
+        ASSERT_EQ(ERR_OK, resp.err);
+        ASSERT_EQ(expected_old_atomic_idempotent, resp.old_atomic_idempotent);
+
+        // Ensure that new `atomic_idempotent` has been updated to 
`server_state`.
+        test_get_atomic_idempotent(app_name, expected_new_atomic_idempotent);
+    }
+
     const std::string APP_NAME = "app_operation_test";
     const std::string OLD_APP_NAME = "old_app_operation";
     const std::string DUP_MASTER_APP_NAME = "dup_master_test";
     const std::string DUP_FOLLOWER_APP_NAME = "dup_follower_test";
+    const std::string ATOMIC_IDEMPOTENT_APP_NAME = "atomic_idempotent_test";
 };
 
 TEST_F(meta_app_operation_test, create_app)
@@ -371,6 +442,7 @@ TEST_F(meta_app_operation_test, create_app)
         app_status::type before_status;
         error_code expected_err;
         std::map<std::string, std::string> envs = {};
+        bool atomic_idempotent = false;
     } tests[] = {
         // Wrong partition_count (< 0).
         {APP_NAME, -1, 3, 2, 3, 1, false, app_status::AS_INVALID, 
ERR_INVALID_PARAMETERS},
@@ -661,6 +733,18 @@ TEST_F(meta_app_operation_test, create_app)
           {duplication_constants::kEnvMasterMetasKey, "10.1.2.3:34601"},
           {duplication_constants::kEnvMasterAppNameKey, DUP_MASTER_APP_NAME},
           {duplication_constants::kEnvFollowerAppStatusKey, 
"invalid_creating_status"}}},
+        // Create a table with idempotence enabled for atomic writes.
+        {ATOMIC_IDEMPOTENT_APP_NAME,
+         4,
+         3,
+         2,
+         3,
+         3,
+         false,
+         app_status::AS_INVALID,
+         ERR_OK,
+         {},
+         true},
     };
 
     clear_nodes();
@@ -706,15 +790,17 @@ TEST_F(meta_app_operation_test, create_app)
             update_app_status(test.before_status);
         }
 
-        auto err = create_app_test(test.partition_count,
-                                   test.replica_count,
-                                   test.success_if_exist,
-                                   test.app_name,
-                                   test.envs);
+        const auto err = create_app_test(test.app_name,
+                                         test.partition_count,
+                                         test.replica_count,
+                                         test.success_if_exist,
+                                         test.envs,
+                                         test.atomic_idempotent);
         ASSERT_EQ(test.expected_err, err);
 
         if (test.expected_err == ERR_OK) {
             verify_app_envs(test.app_name, test.envs);
+            verify_app_atomic_idempotent(test.app_name, 
test.atomic_idempotent);
         }
 
         _ms->set_node_state(nodes, true);
@@ -728,10 +814,10 @@ TEST_F(meta_app_operation_test, create_app)
             all_test_envs.insert(test.envs.begin(), test.envs.end());
         }
         for (const auto &option : replica_envs::ROCKSDB_DYNAMIC_OPTIONS) {
-            ASSERT_TRUE(all_test_envs.find(option) != all_test_envs.end());
+            ASSERT_TRUE(gutil::ContainsKey(all_test_envs, option));
         }
         for (const auto &option : replica_envs::ROCKSDB_STATIC_OPTIONS) {
-            ASSERT_TRUE(all_test_envs.find(option) != all_test_envs.end());
+            ASSERT_TRUE(gutil::ContainsKey(all_test_envs, option));
         }
     }
 
@@ -897,8 +983,8 @@ TEST_F(meta_app_operation_test, get_max_replica_count)
         }
 
         const auto resp = get_max_replica_count(test.app_name);
-        ASSERT_EQ(resp.err, test.expected_err);
-        ASSERT_EQ(resp.max_replica_count, test.expected_max_replica_count);
+        ASSERT_EQ(test.expected_err, resp.err);
+        ASSERT_EQ(test.expected_max_replica_count, resp.max_replica_count);
 
         recover_partition_max_replica_count();
     }
@@ -1037,8 +1123,8 @@ TEST_F(meta_app_operation_test, set_max_replica_count)
                                                          
test.initial_max_replica_count);
 
             const auto resp = get_max_replica_count(test.app_name);
-            ASSERT_EQ(resp.err, ERR_OK);
-            ASSERT_EQ(resp.max_replica_count, test.initial_max_replica_count);
+            ASSERT_EQ(ERR_OK, resp.err);
+            ASSERT_EQ(test.initial_max_replica_count, resp.max_replica_count);
         }
 
         // recover automatically the original 
FLAGS_min_live_node_count_for_unfreeze,
@@ -1078,13 +1164,13 @@ TEST_F(meta_app_operation_test, set_max_replica_count)
 
         const auto get_resp = get_max_replica_count(test.app_name);
         if (test.expected_err == ERR_APP_NOT_EXIST || test.expected_err == 
ERR_INCONSISTENT_STATE) {
-            ASSERT_EQ(get_resp.err, test.expected_err);
+            ASSERT_EQ(test.expected_err, get_resp.err);
         } else if (test.expected_err != ERR_OK) {
-            ASSERT_EQ(get_resp.err, ERR_OK);
+            ASSERT_EQ(ERR_OK, get_resp.err);
         }
 
         if (test.expected_err != ERR_OK) {
-            ASSERT_EQ(get_resp.max_replica_count, 
test.expected_old_max_replica_count);
+            ASSERT_EQ(test.expected_old_max_replica_count, 
get_resp.max_replica_count);
         }
 
         _ms->set_node_state(nodes, true);
@@ -1106,6 +1192,20 @@ TEST_F(meta_app_operation_test, 
recover_from_max_replica_count_env)
     verify_app_max_replica_count(APP_NAME, new_max_replica_count);
 }
 
+TEST_F(meta_app_operation_test, change_atomic_idempotent)
+{
+    create_app(APP_NAME, 4);
+
+    // Initial `atomic_idempotent` should be false by default.
+    test_get_atomic_idempotent(APP_NAME, false);
+
+    // Enable `atomic_idempotent` and its previous value should be false.
+    test_set_atomic_idempotent(APP_NAME, true, false);
+
+    // Disable `atomic_idempotent` and its previous value should be true.
+    test_set_atomic_idempotent(APP_NAME, false, true);
+}
+
 TEST_F(meta_app_operation_test, rename_app)
 {
     const std::string app_name_1 = APP_NAME + "_rename_1";
diff --git a/src/meta/test/state_sync_test.cpp 
b/src/meta/test/state_sync_test.cpp
index d440c00dc..289d01a9c 100644
--- a/src/meta/test/state_sync_test.cpp
+++ b/src/meta/test/state_sync_test.cpp
@@ -25,6 +25,8 @@
  */
 
 #include <boost/lexical_cast.hpp>
+#include <fmt/core.h>
+
 #include <algorithm>
 #include <cstdint>
 #include <fstream> // IWYU pragma: keep
@@ -151,10 +153,15 @@ void meta_service_test_app::state_sync_test()
             info.is_stateful = true;
             info.app_id = i;
             info.app_type = "simple_kv";
-            info.app_name = "test_app" + boost::lexical_cast<std::string>(i);
+            info.app_name = fmt::format("test_app{}", i);
             info.max_replica_count = 3;
-            info.partition_count = random32(100, 10000);
+            info.partition_count = static_cast<int32_t>(random32(100, 10000));
             info.status = dsn::app_status::AS_CREATING;
+
+            // `atomic_idempotent` will be set true for the table with even 
index,
+            // otherwise false.
+            info.atomic_idempotent = (static_cast<uint32_t>(i) & 1U) == 0;
+
             std::shared_ptr<app_state> app = app_state::create(info);
 
             ss->_all_apps.emplace(app->app_id, app);
@@ -174,8 +181,8 @@ void meta_service_test_app::state_sync_test()
             }
         }
 
-        dsn::error_code ec = ss->sync_apps_to_remote_storage();
-        ASSERT_EQ(ec, dsn::ERR_OK);
+        error_code ec = ss->sync_apps_to_remote_storage();
+        ASSERT_EQ(ERR_OK, ec);
         ss->spin_wait_staging();
     }
 
@@ -184,11 +191,20 @@ void meta_service_test_app::state_sync_test()
     {
         std::shared_ptr<server_state> ss2 = std::make_shared<server_state>();
         ss2->initialize(svc, apps_root);
-        dsn::error_code ec = ss2->sync_apps_from_remote_storage();
-        ASSERT_EQ(ec, dsn::ERR_OK);
+        error_code ec = ss2->sync_apps_from_remote_storage();
+        ASSERT_EQ(ERR_OK, ec);
 
         for (int i = 1; i <= apps_count; ++i) {
             std::shared_ptr<app_state> app = ss2->get_app(i);
+
+            // `app->__isset.atomic_idempotent` must be true since by default 
it is true
+            // (because `app->atomic_idempotent` has default value false).
+            ASSERT_TRUE(app->__isset.atomic_idempotent);
+
+            // Recovered `app->atomic_idempotent` will be true for the table 
with even
+            // index, otherwise false.
+            ASSERT_EQ((static_cast<uint32_t>(i) & 1U) == 0, 
app->atomic_idempotent);
+
             for (int j = 0; j < app->partition_count; ++j) {
                 config_context &cc = app->helpers->contexts[j];
                 ASSERT_EQ(1, cc.dropped.size());
@@ -196,7 +212,7 @@ void meta_service_test_app::state_sync_test()
             }
         }
         ec = ss2->dump_from_remote_storage("meta_state.dump1", false);
-        ASSERT_EQ(ec, dsn::ERR_OK);
+        ASSERT_EQ(ERR_OK, ec);
     }
 
     // dump another way
diff --git a/src/ranger/ranger_resource_policy_manager.cpp 
b/src/ranger/ranger_resource_policy_manager.cpp
index d0ac9362e..61d733949 100644
--- a/src/ranger/ranger_resource_policy_manager.cpp
+++ b/src/ranger/ranger_resource_policy_manager.cpp
@@ -180,7 +180,8 @@ 
ranger_resource_policy_manager::ranger_resource_policy_manager(
                               "RPC_CM_QUERY_PARTITION_SPLIT",
                               "RPC_CM_QUERY_BULK_LOAD_STATUS",
                               "RPC_CM_QUERY_MANUAL_COMPACT_STATUS",
-                              "RPC_CM_GET_MAX_REPLICA_COUNT"},
+                              "RPC_CM_GET_MAX_REPLICA_COUNT",
+                              "RPC_CM_GET_ATOMIC_IDEMPOTENT"},
                              _ac_type_of_database_rpcs);
     // DATABASE - kControl
     register_rpc_access_type(access_type::kControl,
@@ -198,6 +199,7 @@ 
ranger_resource_policy_manager::ranger_resource_policy_manager(
                               "RPC_CM_CLEAR_BULK_LOAD",
                               "RPC_CM_START_MANUAL_COMPACT",
                               "RPC_CM_SET_MAX_REPLICA_COUNT",
+                              "RPC_CM_SET_ATOMIC_IDEMPOTENT",
                               "RPC_CM_RENAME_APP"},
                              _ac_type_of_database_rpcs);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to