This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2bb4c0eaa4 [INLONG-10851][SDK] Support multiple protocols for
DataProxy C++ SDK (#10855)
2bb4c0eaa4 is described below
commit 2bb4c0eaa42ebf57c4fbd13d2214864d2e134408
Author: doleyzi <[email protected]>
AuthorDate: Thu Aug 22 17:17:10 2024 +0800
[INLONG-10851][SDK] Support multiple protocols for DataProxy C++ SDK
(#10855)
---
.../dataproxy-sdk-cpp/src/client/tcp_client.cc | 2 +-
.../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 31 ++--
.../dataproxy-sdk-cpp/src/config/sdk_conf.h | 7 +-
.../dataproxy-sdk-cpp/src/group/recv_group.cc | 10 +-
.../dataproxy-sdk-cpp/src/group/recv_group.h | 3 +
.../src/manager/metric_manager.cc | 1 +
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 133 +++--------------
.../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 3 +-
.../dataproxy-sdk-cpp/src/manager/send_manager.cc | 2 +-
.../dataproxy-sdk-cpp/src/metric/environment.h | 7 +-
.../dataproxy-sdk-cpp/src/metric/metric.h | 4 +-
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 1 +
.../dataproxy-sdk-cpp/src/utils/parse_json.cc | 160 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/utils/parse_json.h | 35 +++++
14 files changed, 248 insertions(+), 151 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index c3b7692809..1e4d8bcedf 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -307,7 +307,7 @@ void TcpClient::UpdateMetric() {
stat.Update(it.second);
it.second.ResetStat();
}
- LOG_INFO(stat.ToString() << CLIENT_INFO);
+ LOG_INFO(stat.GetSendMetricInfo() << CLIENT_INFO);
}
void TcpClient::HeartBeat(bool only_heart_heat) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 5f067528af..6ff642472f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -121,8 +121,6 @@ void SdkConfig::defaultInit() {
// manager parameters
manager_url_ = constants::kManagerURL;
- enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster;
- manager_cluster_url_ = constants::kManagerClusterURL;
manager_update_interval_ = constants::kManagerUpdateInterval;
manager_url_timeout_ = constants::kManagerTimeout;
max_proxy_num_ = constants::kMaxProxyNum;
@@ -329,22 +327,7 @@ void SdkConfig::InitManagerParam(const rapidjson::Value
&doc) {
} else {
manager_url_ = constants::kManagerURL;
}
- // manager cluster url
- if (doc.HasMember("manager_cluster_url") &&
- doc["manager_cluster_url"].IsString()) {
- const rapidjson::Value &obj = doc["manager_cluster_url"];
- manager_cluster_url_ = obj.GetString();
- } else {
- manager_cluster_url_ = constants::kManagerClusterURL;
- }
- // enable manager from cluster
- if (doc.HasMember("enable_manager_url_from_cluster") &&
- doc["enable_manager_url_from_cluster"].IsBool()) {
- const rapidjson::Value &obj = doc["enable_manager_url_from_cluster"];
- enable_manager_url_from_cluster_ = obj.GetBool();
- } else {
- enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster;
- }
+
// manager update interval
if (doc.HasMember("manager_update_interval") &&
doc["manager_update_interval"].IsInt() &&
@@ -522,6 +505,13 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) {
} else {
max_instance_ = constants::kMaxInstance;
}
+
+ if (doc.HasMember("extend_report") && doc["extend_report"].IsBool()) {
+ const rapidjson::Value &obj = doc["extend_report"];
+ extend_report_ = obj.GetBool();
+ } else {
+ extend_report_ = constants::kExtendReport;
+ }
}
bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string
&localhost) {
@@ -589,11 +579,6 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("log_level: " << log_level_);
LOG_INFO("log_path: " << log_path_.c_str());
LOG_INFO("manager_url: " << manager_url_.c_str());
- LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str());
- LOG_INFO(
- "enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_
- ? "true"
- : "false");
LOG_INFO("manager_update_interval: minutes" << manager_update_interval_);
LOG_INFO("manager_url_timeout: " << manager_url_timeout_);
LOG_INFO("max_tcp_num: " << max_proxy_num_);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index f120343f07..608e076b3b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -25,6 +25,8 @@
#include <stdint.h>
#include <string>
#include <vector>
+#include <atomic>
+#include "../utils/capi_constant.h"
namespace inlong {
class SdkConfig {
@@ -40,7 +42,7 @@ private:
void InitAuthParm(const rapidjson::Value &doc);
void OthersParam(const rapidjson::Value &doc);
bool GetLocalIPV4Address(std::string& err_info, std::string& localhost);
- SdkConfig() { defaultInit(); };
+ SdkConfig():extend_report_(false) { defaultInit(); };
public:
// cache parameter
@@ -80,8 +82,6 @@ private:
// manager parameters
std::string manager_url_;
- bool enable_manager_url_from_cluster_;
- std::string manager_cluster_url_;
uint32_t manager_update_interval_; // Automatic update interval, minutes
uint32_t manager_url_timeout_; // URL parsing timeout, seconds
uint64_t max_proxy_num_;
@@ -114,6 +114,7 @@ private:
uint32_t buf_size_;
volatile bool parsed_ = false;
+ bool extend_report_;
void defaultInit();
static SdkConfig *getInstance();
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 1ffeeef8dc..ec64292a1e 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -50,6 +50,8 @@ RecvGroup::RecvGroup(const std::string &group_key,
std::shared_ptr<SendManager>
last_pack_time_ = Utils::getCurrentMsTime();
max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;
local_ip_ = SdkConfig::getInstance()->local_ip_;
+ group_id_key_ = SdkConfig::getInstance()->extend_report_ ? "bid=" :
"groupId=";
+ stream_id_key_ = SdkConfig::getInstance()->extend_report_ ? "&tid=" :
"&streamId=";
LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_
<< ",max_recv_size:" << max_recv_size_);
}
@@ -224,8 +226,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,uint32_t &
streamId_num_ == 0) {
groupId_num = 0;
streamId_num = 0;
- groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
- "&streamId=" + msgs[0]->inlong_stream_id_;
+ groupId_streamId_char = group_id_key_ + msgs[0]->inlong_group_id_ +
stream_id_key_ + msgs[0]->inlong_stream_id_;
char_groupId_flag = 0x4;
} else {
groupId_num = groupId_num_;
@@ -245,7 +246,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,uint32_t &
"&node1ip=" + SdkConfig::getInstance()->local_ip_ +
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
} else {
- attr = "groupId=" + msgs[0]->inlong_group_id_ +
+ attr = group_id_key_ + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;
}
*(uint16_t *)bodyBegin = htons(attr.size());
@@ -296,8 +297,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,uint32_t &
// attr
std::string attr;
- attr = "groupId=" + msgs[0]->inlong_group_id_ +
- "&streamId=" + msgs[0]->inlong_stream_id_;
+ attr = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ +
msgs[0]->inlong_stream_id_;
attr += "&dt=" + std::to_string(data_time_);
attr += "&mid=" + std::to_string(uniq_id_);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index 839c49e14d..64681de0bb 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -63,6 +63,9 @@ class RecvGroup {
std::unordered_map<std::string, std::queue<SdkMsgPtr>> dispatch_queue_;
std::queue<SendBufferPtrT> fail_queue_;
+ std::string group_id_key_;
+ std::string stream_id_key_;
+
void DoDispatchMsg();
bool IsZipAndOperate(std::string& res, uint32_t real_cur_len);
inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
index 061abc0678..c4d31ac941 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
@@ -39,6 +39,7 @@ void MetricManager::InitEnvironment() {
environment_.setVersion(constants::kVersion);
environment_.setPid(getpid());
environment_.setIp(SdkConfig::getInstance()->local_ip_);
+ environment_.SetExtendReport(SdkConfig::getInstance()->extend_report_);
}
void MetricManager::Run() {
prctl(PR_SET_NAME, "metric-manager");
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 1653d68de0..3db331aeda 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -18,13 +18,15 @@
*/
#include "proxy_manager.h"
+
+#include "api_code.h"
+#include <fstream>
+
#include "../config/ini_help.h"
#include "../utils/capi_constant.h"
#include "../utils/logger.h"
#include "../utils/utils.h"
-#include "api_code.h"
-#include <fstream>
-#include <rapidjson/document.h>
+#include "../utils/parse_json.h"
namespace inlong {
const uint64_t MINUTE = 60000;
@@ -110,104 +112,6 @@ void ProxyManager::DoUpdate() {
LOG_INFO("finish ProxyManager DoUpdate.");
}
-int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id,
- const std::string &meta_data,
- ProxyInfoVec &proxy_info_vec) {
- rapidjson::Document doc;
- if (doc.Parse(meta_data.c_str()).HasParseError()) {
- LOG_ERROR("failed to parse meta_data, error" << doc.GetParseError() << ":"
- << doc.GetErrorOffset());
- return SdkCode::kErrorParseJson;
- }
-
- if (!(doc.HasMember("success") && doc["success"].IsBool() &&
- doc["success"].GetBool())) {
- LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, success: not "
- "exist or false"
- << inlong_group_id.c_str());
- return SdkCode::kErrorParseJson;
- }
- // check data valid
- if (!doc.HasMember("data") || doc["data"].IsNull()) {
- LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, data: not exist
"
- "or null"
- << inlong_group_id.c_str());
- return SdkCode::kErrorParseJson;
- }
-
- // check nodelist valid
- const rapidjson::Value &clusterInfo = doc["data"];
- if (!clusterInfo.HasMember("nodeList") || clusterInfo["nodeList"].IsNull()) {
- LOG_ERROR("invalid nodeList of inlong_group_id:%s, not exist or null"
- << inlong_group_id.c_str());
- return SdkCode::kErrorParseJson;
- }
-
- // check nodeList isn't empty
- const rapidjson::Value &nodeList = clusterInfo["nodeList"];
- if (nodeList.GetArray().Size() == 0) {
- LOG_ERROR("empty nodeList of inlong_group_id:%s"
- << inlong_group_id.c_str());
- return SdkCode::kErrorParseJson;
- }
- // check clusterId
- if (!clusterInfo.HasMember("clusterId") ||
- !clusterInfo["clusterId"].IsInt() ||
- clusterInfo["clusterId"].GetInt() < 0) {
- LOG_ERROR("clusterId of inlong_group_id:%s is not found or not a integer"
- << inlong_group_id.c_str());
- return SdkCode::kErrorParseJson;
- }
- groupid_2_cluster_id_update_map_[inlong_group_id] =
- clusterInfo["clusterId"].GetInt();
-
- // check load
- int32_t load = 0;
- if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() &&
- !clusterInfo["load"].IsNull()) {
- const rapidjson::Value &obj = clusterInfo["load"];
- load = obj.GetInt();
- } else {
- load = 0;
- }
-
- // proxy list
- for (auto &proxy : nodeList.GetArray()) {
- std::string ip;
- std::string id;
- int32_t port;
- if (proxy.HasMember("ip") && !proxy["ip"].IsNull())
- ip = proxy["ip"].GetString();
- else {
- LOG_ERROR("this ip info is null");
- continue;
- }
- if (proxy.HasMember("port") && !proxy["port"].IsNull()) {
- if (proxy["port"].IsString())
- port = std::stoi(proxy["port"].GetString());
- else if (proxy["port"].IsInt())
- port = proxy["port"].GetInt();
- }
-
- else {
- LOG_ERROR("this ip info is null or negative");
- continue;
- }
- if (proxy.HasMember("id") && !proxy["id"].IsNull()) {
- if (proxy["id"].IsString())
- id = proxy["id"].GetString();
- else if (proxy["id"].IsInt())
- id = proxy["id"].GetInt();
- } else {
- LOG_WARN("there is no id info of inlong_group_id");
- continue;
- }
- proxy_info_vec.emplace_back(id, ip, port, load);
- }
-
- return SdkCode::kSuccess;
-}
-
int32_t ProxyManager::GetProxy(const std::string &key,
ProxyInfoVec &proxy_info_vec) {
if (constants::IsolationLevel::kLevelOne ==
@@ -407,16 +311,15 @@ void ProxyManager::UpdateProxy(
LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first);
continue;
}
- std::string url;
- if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
- url = SdkConfig::getInstance()->manager_cluster_url_;
- else {
- url =
- SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first;
+ std::string url = SdkConfig::getInstance()->manager_url_ + "/" +
groupid2cluster.first;
+ if (SdkConfig::getInstance()->extend_report_) {
+ url = SdkConfig::getInstance()->manager_url_ + "?bid=" +
groupid2cluster.first + "&net_tag=all&ip=" +
+ SdkConfig::getInstance()->local_ip_;
}
+
std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
- "&version=" + constants::kVersion +
- "&protocolType=" + constants::kProtocolType;
+ "&version=" + constants::kVersion +
+ "&protocolType=" + constants::kProtocolType;
LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
<< "proxy cfg url " << url.c_str()
<< "post_data:" << post_data.c_str());
@@ -441,7 +344,7 @@ void ProxyManager::UpdateProxy(
if (groupid_2_proxy_map_.find(groupid2cluster.first) !=
groupid_2_proxy_map_.end()) {
LOG_WARN("failed to request from manager, use previous "
- << groupid2cluster.first);
+ << groupid2cluster.first);
continue;
}
if (!SdkConfig::getInstance()->enable_local_cache_) {
@@ -456,10 +359,14 @@ void ProxyManager::UpdateProxy(
}
ProxyInfoVec proxyInfoVec;
- ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
+ if (SdkConfig::getInstance()->extend_report_) {
+ ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data,
groupid_2_cluster_id_update_map_, proxyInfoVec);
+ } else {
+ ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data,
proxyInfoVec, groupid_2_cluster_id_update_map_);
+ }
+
if (ret != SdkCode::kSuccess) {
- LOG_ERROR("failed to parse groupid:%s json proxy list "
- << groupid2cluster.first.c_str());
+ LOG_ERROR("Failed to parse json: " << meta_data);
continue;
}
if (!proxyInfoVec.empty()) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 419bc60798..2dec2f572e 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -27,6 +27,7 @@
#include <vector>
namespace inlong {
+using GroupId2ClusterIdMap = std::unordered_map<std::string, int32_t>;
class ProxyManager {
private:
uint32_t timeout_;
@@ -53,7 +54,7 @@ private:
uint64_t last_update_time_;
int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
- ProxyInfoVec &proxy_info_vec);
+ ProxyInfoVec &proxy_info_vec,GroupId2ClusterIdMap
&group_id_2_cluster_id);
public:
ProxyManager(){};
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index 0af18f7ccb..fe830c4cb0 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -64,7 +64,7 @@ SendGroupPtr SendManager::DoGetSendGroup(const std::string
&send_group_key) {
unique_read_lock<read_write_mutex> rdlck(send_group_map_rwmutex_);
auto send_group_map = send_group_map_.find(send_group_key);
if (send_group_map == send_group_map_.end()) {
- LOG_ERROR("fail to get send group, group_id:%s" << send_group_key);
+ LOG_ERROR("Fail to get send group, group key:" << send_group_key);
return nullptr;
}
if (send_group_map->second.empty()) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
index e3d4a9bcc8..8d64faadc7 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
@@ -27,6 +27,7 @@ class Environment {
std::string version_;
std::string ip_;
uint64_t pid_;
+ bool extend_report_;
const std::string &getType() const { return type_; }
void setType(const std::string &type) { type_ = type; }
std::string getVersion() { return version_; }
@@ -35,12 +36,14 @@ class Environment {
void setIp(const std::string &ip) { ip_ = ip; }
uint64_t getPid() const { return pid_; }
void setPid(uint64_t pid) { pid_ = pid; }
+ void SetExtendReport(bool extend_report) {extend_report_ = extend_report;}
std::string ToString() const {
std::stringstream metric;
metric << "local ip[" << ip_ << "] ";
- metric << "version [" << version_ << "] ";
- metric << "pid [" << pid_ << "] ";
+ metric << "version[" << version_ << "] ";
+ metric << "pid[" << pid_ << "] ";
+ metric << "extend report[" << extend_report_ << "]";
return metric.str();
}
};
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
index 87bd991b28..1dcbc33afa 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
@@ -130,7 +130,7 @@ class Metric {
metric << "msg[" << send_success_msg_num_ << "] ";
metric << "failed-pack[" << send_failed_pack_num_ << "] ";
metric << "msg[" << send_failed_msg_num_ << "] ";
- metric << "trans[" << getTransTime() << "] ";
+ metric << "trans[" << getTransTime() << "]";
return metric.str();
}
std::string ToString() const {
@@ -142,7 +142,7 @@ class Metric {
metric << "trans[" << getTransTime() << "] ";
metric << "buffer full[" << receive_buffer_full_count_ << "] ";
metric << "too long msg[" << too_long_msg_count_ << "] ";
- metric << "metadata fail[" << metadata_fail_count_ << "] ";
+ metric << "metadata fail[" << metadata_fail_count_ << "]";
return metric.str();
}
};
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index a1753b8255..f97cacf484 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -93,6 +93,7 @@ static const bool kEnableSetAffinity = false;
static const uint32_t kMaskCPUAffinity = 0xff;
static const uint16_t kExtendField = 0;
static const uint64_t kMaxSnowFlake = std::numeric_limits<uint64_t>::max();
+static const bool kExtendReport = false;
// http basic auth
static const char kBasicAuthHeader[] = "Authorization:";
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc
new file mode 100644
index 0000000000..b1e4ebff7e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "parse_json.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/document.h"
+#include "../core/api_code.h"
+#include "../utils/logger.h"
+
+namespace inlong {
+static const char kJsonKeySuccess[] = "success";
+static const char kJsonKeyData[] = "data";
+static const char kJsonKeyNodeList[] = "nodeList";
+static const char kJsonKeyClusterId[] = "clusterId";
+static const char kJsonKeyLoad[] = "load";
+static const char kJsonKeyIp[] = "ip";
+static const char kJsonKeyPort[] = "port";
+static const char kJsonKeyId[] = "id";
+static const char kJsonKeySize[] = "size";
+static const char kJsonKeyClusterIdV2[] = "cluster_id";
+static const char kJsonKeyAddress[] = "address";
+static const char kJsonKeyHost[] = "host";
+
+int32_t ParseJson::ParseProxyInfo(const std::string &inlong_group_id,
+ const std::string &meta_data,
+ ProxyInfoVec &proxy_info_vec,
+ GroupId2ClusterIdMap &group_id_2_cluster_id)
{
+ rapidjson::Document doc;
+ if (doc.Parse(meta_data.c_str()).HasParseError()) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (!(doc.HasMember(kJsonKeySuccess) && doc[kJsonKeySuccess].IsBool() &&
+ doc[kJsonKeySuccess].GetBool())) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (!doc.HasMember(kJsonKeyData) || doc[kJsonKeyData].IsNull()) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ const rapidjson::Value &clusterInfo = doc[kJsonKeyData];
+ if (!clusterInfo.HasMember(kJsonKeyNodeList) ||
clusterInfo[kJsonKeyNodeList].IsNull()) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ const rapidjson::Value &nodeList = clusterInfo[kJsonKeyNodeList];
+ if (nodeList.GetArray().Size() <= 0) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (!clusterInfo.HasMember(kJsonKeyClusterId) ||
+ !clusterInfo[kJsonKeyClusterId].IsInt() ||
+ clusterInfo[kJsonKeyClusterId].GetInt() <= 0) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ group_id_2_cluster_id[inlong_group_id] =
clusterInfo[kJsonKeyClusterId].GetInt();
+
+ int32_t load = 0;
+ if (clusterInfo.HasMember(kJsonKeyLoad) && clusterInfo[kJsonKeyLoad].IsInt()
&&
+ !clusterInfo[kJsonKeyLoad].IsNull()) {
+ load = clusterInfo[kJsonKeyLoad].GetInt();
+ }
+
+ for (auto &proxy : nodeList.GetArray()) {
+ std::string ip;
+ std::string id;
+ int32_t port;
+ if (!proxy.HasMember(kJsonKeyIp) || proxy[kJsonKeyIp].IsNull()) {
+ continue;
+ }
+ ip = proxy[kJsonKeyIp].GetString();
+
+ if (!proxy.HasMember(kJsonKeyPort) || proxy[kJsonKeyPort].IsNull()) {
+ continue;
+ }
+ if (proxy[kJsonKeyPort].IsString()) port =
std::stoi(proxy[kJsonKeyPort].GetString());
+ if (proxy[kJsonKeyPort].IsInt()) port = proxy[kJsonKeyPort].GetInt();
+
+ if (!proxy.HasMember(kJsonKeyId) || proxy[kJsonKeyId].IsNull()) {
+ continue;
+ }
+ if (proxy[kJsonKeyId].IsString()) id = proxy[kJsonKeyId].GetString();
+ if (proxy[kJsonKeyId].IsInt()) id = proxy[kJsonKeyId].GetInt();
+
+ proxy_info_vec.emplace_back(id, ip, port, load);
+ }
+ return SdkCode::kSuccess;
+}
+int32_t ParseJson::ParseProxyInfo(const std::string &inlong_group_id, const
std::string &meta_data,
+ GroupId2ClusterIdMap &group_id_2_cluster_id,
ProxyInfoVec &proxy_info_vec) {
+ rapidjson::Document doc;
+ if (doc.Parse(meta_data.c_str()).HasParseError()) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (!doc.HasMember(kJsonKeySize) || !doc[kJsonKeySize].IsInt() ||
doc[kJsonKeySize].IsNull()
+ || doc[kJsonKeySize].GetInt() <= 0) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (doc.HasMember(kJsonKeyClusterIdV2) && doc[kJsonKeyClusterIdV2].IsInt()
&& !doc[kJsonKeyClusterIdV2].IsNull()) {
+ const rapidjson::Value &obj = doc[kJsonKeyClusterIdV2];
+ group_id_2_cluster_id[inlong_group_id] = obj.GetInt();
+ } else {
+ return SdkCode::kErrorParseJson;
+ }
+
+ int32_t load = 0;
+ if (doc.HasMember(kJsonKeyLoad) && doc[kJsonKeyLoad].IsInt() &&
!doc[kJsonKeyLoad].IsNull()) {
+ load = doc[kJsonKeyLoad].GetInt();
+ }
+
+ if (!doc.HasMember(kJsonKeyAddress) || doc[kJsonKeyAddress].IsNull()) {
+ return SdkCode::kErrorParseJson;
+ }
+
+ const rapidjson::Value &host_list = doc[kJsonKeyAddress];
+ for (auto &info : host_list.GetArray()) {
+ std::string id, ip;
+ if (!info.HasMember(kJsonKeyHost) || info[kJsonKeyHost].IsNull()) {
+ continue;
+ }
+ ip = info[kJsonKeyHost].GetString();
+
+ if (!info.HasMember(kJsonKeyPort) || info[kJsonKeyPort].IsNull()) {
+ continue;
+ }
+
+ int32_t port;
+ if (info[kJsonKeyPort].IsString()) port =
std::stoi(info[kJsonKeyPort].GetString());
+ if (info[kJsonKeyPort].IsInt()) port = info[kJsonKeyPort].GetInt();
+
+ if (!info.HasMember(kJsonKeyId) || info[kJsonKeyId].IsNull()) {
+ continue;
+ }
+ if (info[kJsonKeyId].IsString()) id = info[kJsonKeyId].GetString();
+ if (info[kJsonKeyId].IsInt()) id =
std::to_string(info[kJsonKeyId].GetInt());
+
+ proxy_info_vec.emplace_back(id, ip, port, load);
+ }
+ return SdkCode::kSuccess;
+}
+}
+
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h
new file mode 100644
index 0000000000..01abdbdc6e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef
DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H
+#define
DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H
+
+#include <string>
+#include <unordered_map>
+#include "../config/proxy_info.h"
+
+namespace inlong {
+using GroupId2ClusterIdMap = std::unordered_map<std::string, int32_t>;
+class ParseJson {
+ public:
+ static int32_t ParseProxyInfo(const std::string &inlong_group_id, const
std::string &meta_data,
+ ProxyInfoVec &proxy_info_vec,
GroupId2ClusterIdMap &group_id_2_cluster_id);
+ static int32_t ParseProxyInfo(const std::string &inlong_group_id, const
std::string &meta_data,
+ GroupId2ClusterIdMap &group_id_2_cluster_id,
ProxyInfoVec &proxy_info_vec);
+};
+}
+#endif
//DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H