This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9ac4e5af08 [INLONG-10838][SDK] Optimize the ability to send data for
DataProxy C++ SDK (#10850)
9ac4e5af08 is described below
commit 9ac4e5af08812b6b0364a5296f9afc63433cee82
Author: doleyzi <[email protected]>
AuthorDate: Wed Aug 21 18:46:39 2024 +0800
[INLONG-10838][SDK] Optimize the ability to send data for DataProxy C++ SDK
(#10850)
---
.../dataproxy-sdk-cpp/include/inlong_api.h | 54 +++++
.../dataproxy-sdk-cpp/src/client/tcp_client.cc | 10 +-
.../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 6 +
.../dataproxy-sdk-cpp/src/config/sdk_conf.h | 2 +-
.../dataproxy-sdk-cpp/src/core/api_imp.cc | 34 +--
.../dataproxy-sdk-cpp/src/core/api_imp.h | 10 +-
.../dataproxy-sdk-cpp/src/core/inlong_api.cc | 29 ++-
.../dataproxy-sdk-cpp/src/core/inlong_api.h | 44 ++--
.../dataproxy-sdk-cpp/src/core/sdk_msg.h | 37 +++-
.../dataproxy-sdk-cpp/src/group/recv_group.cc | 10 +-
.../dataproxy-sdk-cpp/src/group/send_group.cc | 236 +++++++++------------
.../dataproxy-sdk-cpp/src/group/send_group.h | 57 +++--
.../dataproxy-sdk-cpp/src/protocol/msg_protocol.cc | 4 +-
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 1 +
.../dataproxy-sdk-cpp/src/utils/send_buffer.h | 153 +++++++------
15 files changed, 362 insertions(+), 325 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h
new file mode 100644
index 0000000000..8f79c3b802
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INLONG_SDK_API_H
+#define INLONG_SDK_API_H
+
+#include <clocale>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace inlong {
+
+typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t,
+ const int64_t, const char *);
+
+class ApiImp;
+
+class InLongApi {
+ public:
+ InLongApi();
+ ~InLongApi();
+ int32_t InitApi(const char *config_path);
+
+ int32_t AddInLongGroupId(const std::vector<std::string> &group_ids);
+
+ int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len,
+ UserCallBack call_back = nullptr);
+
+ int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len,
+ int64_t data_time, UserCallBack call_back = nullptr);
+
+ int32_t CloseApi(int32_t max_waitms);
+
+ private:
+ std::shared_ptr<ApiImp> api_impl_;
+};
+} // namespace inlong
+#endif // INLONG_SDK_API_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index eb3264ba6c..c3b7692809 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -164,7 +164,7 @@ void TcpClient::BeginWrite() {
}
last_update_time_ = Utils::getCurrentMsTime();
status_ = kWriting;
- asio::async_write(*socket_, asio::buffer(sendBuffer_->content(),
sendBuffer_->len()),
+ asio::async_write(*socket_, asio::buffer(sendBuffer_->GetData(),
sendBuffer_->GetDataLen()),
std::bind(&TcpClient::OnWroten, this,
std::placeholders::_1, std::placeholders::_2));
}
void TcpClient::OnWroten(const asio::error_code error, std::size_t
bytes_transferred) {
@@ -390,8 +390,8 @@ void TcpClient::ParseHeartBeat(size_t total_length) {
void TcpClient::ParseGenericResponse() {
if (sendBuffer_ != nullptr) {
- std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner +
sendBuffer_->getStreamId();
- stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->msgCnt());
+ std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner +
sendBuffer_->GetInlongStreamId();
+ stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->GetMsgCnt());
stat_map_[stat_key].AddSendSuccessPackNum(1);
stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() -
last_update_time_);
@@ -448,8 +448,8 @@ void TcpClient::ResetSendBuffer() {
}
retry_times_++;
if (retry_times_ > SdkConfig::getInstance()->retry_times_) {
- std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner +
sendBuffer_->getStreamId();
- stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->msgCnt());
+ std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner +
sendBuffer_->GetInlongStreamId();
+ stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->GetMsgCnt());
stat_map_[stat_key].AddSendFailPackNum(1);
stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() -
last_update_time_);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 12a82b07e7..5f067528af 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -443,6 +443,12 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
} else {
retry_times_ = constants::kRetryTimes;
}
+ if (doc.HasMember("proxy_repeat_times") && doc["proxy_repeat_times"].IsInt()
&& doc["proxy_repeat_times"].GetInt() >= 0) {
+ const rapidjson::Value &obj = doc["proxy_repeat_times"];
+ proxy_repeat_times_ = obj.GetInt();
+ } else {
+ proxy_repeat_times_ = constants::kProxyRepeatTimes;
+ }
}
void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
// auth settings
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index ae6cfde58b..f120343f07 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -96,7 +96,7 @@ private:
bool enable_balance_;
bool enable_local_cache_;
uint32_t retry_times_;
-
+ uint32_t proxy_repeat_times_;
// auth settings
bool need_auth_;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 7ae2c79ea2..2109af502e 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -46,31 +46,31 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
return DoInit();
}
-int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char
*msg, int32_t msg_len,
+int32_t ApiImp::Send(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back) {
- int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+ int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
}
- return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back);
+ return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len},
call_back);
}
-int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char
*msg, int32_t msg_len,
+int32_t ApiImp::Send(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len,
int64_t data_time, UserCallBack call_back) {
- int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+ int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
}
- return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back,
data_time);
+ return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len},
call_back, data_time);
}
-int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id,
const char *msg, int32_t msg_len){
+int32_t ApiImp::ValidateParams(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len){
if (msg_len > max_msg_length_) {
- MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
+ MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id,
inlong_stream_id, 1);
return SdkCode::kMsgTooLong;
}
- if (group_id == nullptr || stream_id == nullptr || msg == nullptr || msg_len
<= 0) {
+ if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg ==
nullptr || msg_len <= 0) {
return SdkCode::kInvalidInput;
}
@@ -80,9 +80,9 @@ int32_t ApiImp::ValidateParams(const char *group_id, const
char *stream_id, cons
return SdkCode::kSuccess;
}
-int32_t ApiImp::SendBase(const std::string& inlong_group_id, const
std::string& stream_id, const std::string& msg,
+int32_t ApiImp::SendBase(const std::string& inlong_group_id, const
std::string& inlong_stream_id, const std::string& msg,
UserCallBack call_back, int64_t report_time) {
- int32_t check_ret = CheckData(inlong_group_id, stream_id, msg);
+ int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg);
if (check_ret != SdkCode::kSuccess) {
return check_ret;
}
@@ -91,11 +91,11 @@ int32_t ApiImp::SendBase(const std::string&
inlong_group_id, const std::string&
auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id);
if (recv_group == nullptr) {
- LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id <<
",getStreamId:" << stream_id);
+ LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id <<
",getStreamId:" << inlong_stream_id);
return SdkCode::kFailGetRevGroup;
}
- return recv_group->SendData(msg, inlong_group_id, stream_id, report_time,
call_back);
+ return recv_group->SendData(msg, inlong_group_id, inlong_stream_id,
report_time, call_back);
}
int32_t ApiImp::CloseApi(int32_t max_waitms) {
@@ -125,19 +125,19 @@ int32_t ApiImp::DoInit() {
return InitManager();
}
-int32_t ApiImp::CheckData(const std::string& group_id, const std::string&
stream_id, const std::string& msg) {
+int32_t ApiImp::CheckData(const std::string& inlong_group_id, const
std::string& inlong_stream_id, const std::string& msg) {
if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
LOG_ERROR("capi has been closed, Init first and then send");
return SdkCode::kSendAfterClose;
}
- if (msg.empty() || group_id.empty() || stream_id.empty()) {
- LOG_ERROR("invalid input, group id:" << group_id << " stream id:" <<
stream_id << "msg" << msg);
+ if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) {
+ LOG_ERROR("invalid input, group id:" << inlong_group_id << " stream id:"
<< inlong_stream_id << "msg" << msg);
return SdkCode::kInvalidInput;
}
if (msg.size() > SdkConfig::getInstance()->max_msg_size_) {
- MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
+ MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id,
inlong_stream_id, 1);
LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << "
ext_pack_size"
<<
SdkConfig::getInstance()->max_msg_size_);
return SdkCode::kMsgTooLong;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
index 32ca093416..cd6a9757a4 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
@@ -34,10 +34,10 @@ class ApiImp {
~ApiImp();
int32_t InitApi(const char *config_file_path);
- int32_t Send(const char *group_id, const char *stream_id, const char *msg,
int32_t msg_len,
+ int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len,
UserCallBack call_back = nullptr);
- int32_t Send(const char *group_id, const char *stream_id, const char *msg,
int32_t msg_len,
+ int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len,
int64_t report_time, UserCallBack call_back = nullptr);
int32_t CloseApi(int32_t max_waitms);
@@ -46,12 +46,12 @@ class ApiImp {
private:
int32_t DoInit();
int32_t InitManager();
- int32_t SendBase(const std::string& inlong_group_id, const std::string&
stream_id, const std::string& msg, UserCallBack call_back,
+ int32_t SendBase(const std::string& inlong_group_id, const std::string&
inlong_stream_id, const std::string& msg, UserCallBack call_back,
int64_t report_time = 0);
- int32_t CheckData(const std::string& group_id, const std::string& stream_id,
const std::string& msg);
+ int32_t CheckData(const std::string& inlong_group_id, const std::string&
inlong_stream_id, const std::string& msg);
- int32_t ValidateParams(const char *group_id, const char *stream_id, const
char *msg, int32_t msg_len);
+ int32_t ValidateParams(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len);
AtomicInt user_exit_flag_{0};
volatile bool init_flag_ = false;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc
index 14a5f91207..aa0674414d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc
@@ -15,28 +15,25 @@
* limitations under the License.
*/
-#include "inlong_api.h"
+#include "../../include/inlong_api.h"
#include "../core/api_imp.h"
namespace inlong {
-InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); };
+InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); }
InLongApi::~InLongApi() { api_impl_->CloseApi(10); }
-int32_t InLongApi::InitApi(const char *config_path) {
- return api_impl_->InitApi(config_path);
-}
+int32_t InLongApi::InitApi(const char *config_path) { return
api_impl_->InitApi(config_path); }
-int32_t InLongApi::Send(const char *inlong_group_id,
- const char *inlong_stream_id, const char *msg,
- int32_t msg_len, UserCallBack call_back) {
- return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len,
- call_back);
+int32_t InLongApi::Send(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len,
+ UserCallBack call_back) {
+ return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len,
call_back);
}
-int32_t InLongApi::CloseApi(int32_t max_waitms) {
- return api_impl_->CloseApi(max_waitms);
-}
-int32_t InLongApi::AddBid(const std::vector<std::string> &groupids) {
- return api_impl_->AddInLongGroupId(groupids);
+int32_t InLongApi::Send(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len,
+ int64_t data_time, UserCallBack call_back) {
+ return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len,
data_time, call_back);
}
-} // namespace inlong
\ No newline at end of file
+
+int32_t InLongApi::CloseApi(int32_t max_waitms) { return
api_impl_->CloseApi(max_waitms); }
+int32_t InLongApi::AddInLongGroupId(const std::vector<std::string> &bids) {
return api_impl_->AddInLongGroupId(bids); }
+} // namespace inlong
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h
index 66a1a7ce39..88f0a528ff 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h
@@ -1,20 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#ifndef INLONG_SDK_API_H
@@ -34,21 +32,23 @@ typedef int (*UserCallBack)(const char *, const char *,
const char *, int32_t,
class ApiImp;
class InLongApi {
-public:
+ public:
InLongApi();
~InLongApi();
int32_t InitApi(const char *config_path);
- int32_t AddBid(const std::vector<std::string> &groupids);
+ int32_t AddGroupId(const std::vector<std::string> &group_ids);
- int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
- const char *msg, int32_t msg_len,
+ int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len,
UserCallBack call_back = nullptr);
+ int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len,
+ int64_t data_time, UserCallBack call_back = nullptr);
+
int32_t CloseApi(int32_t max_waitms);
-private:
+ private:
std::shared_ptr<ApiImp> api_impl_;
};
-} // namespace inlong
-#endif // INLONG_SDK_API_H
+} // namespace inlong
+#endif // INLONG_SDK_API_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
index d9e8366486..69bcdddde8 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-#ifndef SDK_USER_MSG_H_
-#define SDK_USER_MSG_H_
+#ifndef SDK_USER_MSG_H
+#define SDK_USER_MSG_H
#include <functional>
#include <memory>
@@ -40,27 +40,44 @@ struct SdkMsg {
std::string inlong_group_id_;
std::string inlong_stream_id_;
- SdkMsg(const std::string &mmsg, const std::string &mclient_ip,
- int64_t mreport_time, UserCallBack mcb, const std::string &attr,
- const std::string &u_ip, int64_t u_time,const std::string&
inlong_group_id,const std::string& inlong_stream_id)
+ SdkMsg(const std::string &mmsg,
+ const std::string &mclient_ip,
+ int64_t mreport_time,
+ UserCallBack mcb,
+ const std::string &attr,
+ const std::string &u_ip,
+ int64_t u_time,
+ const std::string &inlong_group_id,
+ const std::string &inlong_stream_id)
: msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time),
cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip),
data_pack_format_attr_(attr),
inlong_group_id_(inlong_group_id),
- inlong_stream_id_(inlong_stream_id){}
- SdkMsg(){};
+ inlong_stream_id_(inlong_stream_id) {}
+ SdkMsg() {};
void setMsg(const std::string &msg) { msg_ = msg; }
void setClientIp(const std::string &clientIp) { client_ip_ = clientIp; }
void setReportTime(uint64_t reportTime) { report_time_ = reportTime; }
- void setCb(UserCallBack cb) { cb_ = cb;}
+ void setCb(UserCallBack cb) { cb_ = cb; }
void setUserReportTime(uint64_t userReportTime) { user_report_time_ =
userReportTime; }
void setUserClientIp(const std::string &userClientIp) { user_client_ip_ =
userClientIp; }
void setDataPackFormatAttr(const std::string &dataPackFormatAttr) {
data_pack_format_attr_ = dataPackFormatAttr; }
void setGroupId(const std::string &inlong_group_id) { inlong_group_id_ =
inlong_group_id; }
void setStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ =
inlong_stream_id; }
+
+ void clear() {
+ msg_ = "";
+ client_ip_ = "";
+ report_time_ = 0;
+ cb_ = nullptr;
+ user_report_time_ = 0;
+ user_client_ip_ = "";
+ data_pack_format_attr_ = "";
+ inlong_group_id_ = "";
+ inlong_stream_id_ = "";
+ }
};
using SdkMsgPtr = std::shared_ptr<SdkMsg>;
} // namespace inlong
-
-#endif // SDK_USER_MSG_H_
\ No newline at end of file
+#endif // SDK_USER_MSG_H
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 377432a2ef..1ffeeef8dc 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -352,15 +352,15 @@ std::shared_ptr<SendBuffer>
RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs
uniq_id_ = 0;
}
- if (!PackMsg(msgs, send_buffer->content(), len) || len == 0) {
+ if (!PackMsg(msgs, send_buffer->GetData(), len) || len == 0) {
LOG_ERROR("failed to write data to send buf from pack queue, sendQueue");
return nullptr;
}
- send_buffer->setLen(len);
- send_buffer->setMsgCnt(msg_cnt);
- send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_);
- send_buffer->setStreamId(msgs[0]->inlong_stream_id_);
+ send_buffer->SetDataLen(len);
+ send_buffer->SetMsgCnt(msg_cnt);
+ send_buffer->SetInlongGroupId(msgs[0]->inlong_group_id_);
+ send_buffer->SetInlongStreamId(msgs[0]->inlong_stream_id_);
for (const auto &it : msgs) {
if(it->cb_){
send_buffer->addUserMsg(it);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
index f317163799..0bbedcd0fb 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -1,45 +1,45 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#include "send_group.h"
-#include "api_code.h"
-#include "proxy_manager.h"
+#include <sys/prctl.h>
#include <algorithm>
#include <random>
+#include "../core/api_code.h"
+#include "../manager/proxy_manager.h"
namespace inlong {
-const int kDefaultQueueSize = 20;
+const uint32_t kDefaultQueueSize = 200;
SendGroup::SendGroup(std::string send_group_key)
: work_(asio::make_work_guard(io_context_)),
- send_group_key_(send_group_key), send_idx_(0), dispatch_stat_(0),
- load_threshold_(0), max_proxy_num_(0) {
- max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ /
- SdkConfig::getInstance()->pack_size_;
- if (max_send_queue_num_ <= 0) {
+ send_group_key_(send_group_key),
+ send_idx_(0),
+ dispatch_stat_(0),
+ load_threshold_(0),
+ max_proxy_num_(0) {
+ max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ /
SdkConfig::getInstance()->pack_size_;
+ if (max_send_queue_num_ <= kDefaultQueueSize) {
max_send_queue_num_ = kDefaultQueueSize;
}
- LOG_INFO("SendGroup:" << send_group_key_
- << ",max send queue num:" << max_send_queue_num_);
+ max_send_queue_num_ = std::max(max_send_queue_num_, kDefaultQueueSize);
+ LOG_INFO("SendGroup:" << send_group_key_ << ",max send queue num:" <<
max_send_queue_num_);
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_;
- heart_beat_interval_ =
- SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_;
+ heart_beat_interval_ = SdkConfig::getInstance()->heart_beat_interval_ /
dispatch_interval_;
need_balance_ = SdkConfig::getInstance()->enable_balance_;
work_clients_old_ = nullptr;
@@ -48,23 +48,19 @@ SendGroup::SendGroup(std::string send_group_key)
send_timer_ = std::make_shared<asio::steady_timer>(io_context_);
send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
- send_timer_->async_wait(
- std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1));
+ send_timer_->async_wait(std::bind(&SendGroup::PreDispatchData, this,
std::placeholders::_1));
update_conf_timer_ = std::make_shared<asio::steady_timer>(io_context_);
update_conf_timer_->expires_after(std::chrono::milliseconds(1));
- update_conf_timer_->async_wait(
- std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this,
std::placeholders::_1));
if (SdkConfig::getInstance()->enable_balance_) {
load_balance_timer_ = std::make_shared<asio::steady_timer>(io_context_);
- load_balance_timer_->expires_after(
- std::chrono::milliseconds(load_balance_interval_));
- load_balance_timer_->async_wait(
- std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+
load_balance_timer_->expires_after(std::chrono::milliseconds(load_balance_interval_));
+ load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this,
std::placeholders::_1));
}
- current_bus_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
+ current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
thread_ = std::thread(&SendGroup::Run, this);
}
SendGroup::~SendGroup() {
@@ -84,14 +80,16 @@ SendGroup::~SendGroup() {
thread_.join();
}
}
-void SendGroup::Run() { io_context_.run(); }
+void SendGroup::Run() {
+ prctl(PR_SET_NAME, "send-group");
+ io_context_.run();
+}
void SendGroup::PreDispatchData(std::error_code error) {
if (error) {
return;
}
send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
- send_timer_->async_wait(
- std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+ send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this,
std::placeholders::_1));
}
void SendGroup::DispatchData(std::error_code error) {
@@ -125,8 +123,7 @@ void SendGroup::DispatchData(std::error_code error) {
}
send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
- send_timer_->async_wait(
- std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+ send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this,
std::placeholders::_1));
}
void SendGroup::HeartBeat() {
unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
@@ -148,12 +145,12 @@ void SendGroup::HeartBeat() {
bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; }
-uint32_t SendGroup::PushData(SendBufferPtrT send_buffer_ptr) {
+uint32_t SendGroup::PushData(const SendBufferPtrT &send_buffer_ptr) {
if (IsFull()) {
return SdkCode::kSendBufferFull;
}
std::lock_guard<std::mutex> lock(mutex_);
- send_buf_list_.push(send_buffer_ptr);
+ send_proxy_list_.push(send_buffer_ptr);
return SdkCode::kSuccess;
}
@@ -165,60 +162,53 @@ void SendGroup::UpdateConf(std::error_code error) {
ClearOldTcpClients();
- ProxyInfoVec new_bus_info;
- if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_bus_info) !=
- kSuccess ||
- new_bus_info.empty()) {
+ ProxyInfoVec new_proxy_info;
+ if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_proxy_info)
!= kSuccess || new_proxy_info.empty()) {
update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
- update_conf_timer_->async_wait(
- std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this,
std::placeholders::_1));
return;
}
- if (new_bus_info.empty()) {
- LOG_INFO("UpdateConf new_bus_info is empty!");
+ if (new_proxy_info.empty()) {
+ LOG_INFO("New proxy is empty when update config!");
return;
}
- load_threshold_ = new_bus_info[0].GetLoad() >
constants::kDefaultLoadThreshold
- ? constants::kDefaultLoadThreshold
- : std::max((new_bus_info[0].GetLoad()), 0);
+ load_threshold_ = new_proxy_info[0].GetLoad() >
constants::kDefaultLoadThreshold
+ ? constants::kDefaultLoadThreshold
+ : std::max((new_proxy_info[0].GetLoad()), 0);
- if (!IsConfChanged(current_bus_vec_, new_bus_info)) {
- LOG_INFO("Don`t need UpdateConf. current bus size("
- << current_bus_vec_.size() << ")=bus size(" << new_bus_info.size()
- << ")");
+ if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) {
+ LOG_INFO("Don`t need UpdateConf. current proxy size(" <<
current_proxy_vec_.size() << ")=proxy size("
+ <<
new_proxy_info.size() << ")");
update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
- update_conf_timer_->async_wait(
- std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this,
std::placeholders::_1));
return;
}
- max_proxy_num_ =
- std::min(SdkConfig::getInstance()->max_proxy_num_, new_bus_info.size());
+ max_proxy_num_ = std::min(SdkConfig::getInstance()->max_proxy_num_,
new_proxy_info.size());
- std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp =
- std::make_shared<std::vector<TcpClientTPtrT>>();
+ std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp =
std::make_shared<std::vector<TcpClientTPtrT>>();
if (tcp_clients_tmp == nullptr) {
- LOG_INFO("tcp_clients_tmp is nullptr");
+ LOG_INFO("Tcp clients tmp is nullptr");
update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
- update_conf_timer_->async_wait(
- std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this,
std::placeholders::_1));
return;
}
- std::random_shuffle(new_bus_info.begin(), new_bus_info.end());
+ std::random_shuffle(new_proxy_info.begin(), new_proxy_info.end());
tcp_clients_tmp->reserve(max_proxy_num_);
for (int i = 0; i < max_proxy_num_; i++) {
- ProxyInfo bus_tmp = new_bus_info[i];
- TcpClientTPtrT tcpClientTPtrT =
- std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
- tcp_clients_tmp->push_back(tcpClientTPtrT);
- LOG_INFO("new bus info.[" << bus_tmp.ip() << ":" << bus_tmp.port() << "]");
+ ProxyInfo tmp_proxy = new_proxy_info[i];
+ for (int repeat_time = 0; repeat_time <
SdkConfig::getInstance()->proxy_repeat_times_; repeat_time++) {
+ TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(io_context_,
tmp_proxy.ip(), tmp_proxy.port());
+ tcp_clients_tmp->push_back(tcpClientTPtrT);
+ LOG_INFO("New proxy info.[" << tmp_proxy.ip() << ":" << tmp_proxy.port()
<< "]");
+ }
}
{
- LOG_INFO("do change tcp clients.");
+ LOG_INFO("Do change tcp clients.");
unique_write_lock<read_write_mutex> wtlck(work_clients_mutex_);
work_clients_old_ = work_clients_;
work_clients_ = tcp_clients_tmp;
@@ -230,51 +220,43 @@ void SendGroup::UpdateConf(std::error_code error) {
}
}
- current_bus_vec_ = new_bus_info;
+ current_proxy_vec_ = new_proxy_info;
InitReserveClient();
update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
- update_conf_timer_->async_wait(
- std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this,
std::placeholders::_1));
- LOG_INFO("Finished UpdateConf.");
+ LOG_INFO("Finished update send group config.");
}
SendBufferPtrT SendGroup::PopData() {
std::lock_guard<std::mutex> lock(mutex_);
- if (send_buf_list_.empty()) {
+ if (send_proxy_list_.empty()) {
return nullptr;
}
- SendBufferPtrT send_buf = send_buf_list_.front();
- send_buf_list_.pop();
+ SendBufferPtrT send_buf = send_proxy_list_.front();
+ send_proxy_list_.pop();
return send_buf;
}
uint32_t SendGroup::GetQueueSize() {
std::lock_guard<std::mutex> lock(mutex_);
- return send_buf_list_.size();
+ return send_proxy_list_.size();
}
-bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_bus_vec,
- ProxyInfoVec &new_bus_vec) {
- if (new_bus_vec.empty())
- return false;
- if (current_bus_vec.size() != new_bus_vec.size()) {
+bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_proxy_vec, ProxyInfoVec
&new_proxy_vec) {
+ if (new_proxy_vec.empty()) return false;
+ if (current_proxy_vec.size() != new_proxy_vec.size()) {
return true;
}
- for (auto ¤t_bu : current_bus_vec) {
- for (int i = 0; i < new_bus_vec.size(); i++) {
- if ((current_bu.ip() == new_bus_vec[i].ip()) &&
- (current_bu.port() == new_bus_vec[i].port()))
- break;
- if (i == (new_bus_vec.size() - 1)) {
- if ((current_bu.ip() != new_bus_vec[i].ip() ||
- current_bu.port() == new_bus_vec[i].port())) {
- LOG_INFO("current bus ip." << current_bu.ip() << ":"
- << current_bu.port()
- << " can`t find in bus.");
+ for (auto ¤t_bu : current_proxy_vec) {
+ for (int i = 0; i < new_proxy_vec.size(); i++) {
+ if ((current_bu.ip() == new_proxy_vec[i].ip()) && (current_bu.port() ==
new_proxy_vec[i].port())) break;
+ if (i == (new_proxy_vec.size() - 1)) {
+ if ((current_bu.ip() != new_proxy_vec[i].ip() || current_bu.port() ==
new_proxy_vec[i].port())) {
+ LOG_INFO("current proxy ip." << current_bu.ip() << ":" <<
current_bu.port() << " can`t find in proxy.");
return true;
}
}
@@ -312,8 +294,7 @@ void SendGroup::LoadBalance(std::error_code error) {
uint64_t interval = load_balance_interval_ + rand() % 120 * 1000;
LOG_INFO("LoadBalance interval:" << interval);
load_balance_timer_->expires_after(std::chrono::milliseconds(interval));
- load_balance_timer_->async_wait(
- std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+ load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this,
std::placeholders::_1));
}
void SendGroup::DoLoadBalance() {
@@ -328,17 +309,13 @@ void SendGroup::DoLoadBalance() {
return;
}
- if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) >
- load_threshold_) {
- LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace"
- << work_client->getClientInfo() << ",load[work "
- << work_client->GetAvgLoad() << "][reserve "
- << reserve_client->GetAvgLoad() << "][threshold "
- << load_threshold_ << "]");
+ if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) >
load_threshold_) {
+ LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace"
<< work_client->getClientInfo()
+ << ",load[work " << work_client->GetAvgLoad() <<
"][reserve "
+ << reserve_client->GetAvgLoad() << "][threshold
" << load_threshold_ << "]");
std::string ip = work_client->getIp();
uint32_t port = work_client->getPort();
- work_client->UpdateClient(reserve_client->getIp(),
- reserve_client->getPort());
+ work_client->UpdateClient(reserve_client->getIp(),
reserve_client->getPort());
ProxyInfo proxy = GetRandomProxy(ip, port);
if (!proxy.ip().empty()) {
@@ -350,12 +327,10 @@ void SendGroup::DoLoadBalance() {
}
bool SendGroup::NeedDoLoadBalance() {
unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
- if (load_threshold_ <= 0 ||
- work_clients_->size() == current_bus_vec_.size()) {
- LOG_INFO("Don`t need DoLoadBalance [load_threshold]:"
- << load_threshold_
- << ",[tcp_client size]:" << work_clients_->size()
- << ",[current_bus_vec size]:" << current_bus_vec_.size());
+ if (load_threshold_ <= 0 || work_clients_->size() ==
current_proxy_vec_.size()) {
+ LOG_INFO("Don`t need DoLoadBalance [load_threshold]:" << load_threshold_
+ << ",[tcp_client
size]:" << work_clients_->size()
+ <<
",[current_proxy_vec size]:" << current_proxy_vec_.size());
need_balance_ = false;
return false;
}
@@ -363,12 +338,11 @@ bool SendGroup::NeedDoLoadBalance() {
return true;
}
void SendGroup::InitReserveClient() {
- if (max_proxy_num_ >= current_bus_vec_.size()) {
+ if (max_proxy_num_ >= current_proxy_vec_.size()) {
return;
}
- uint64_t max_reserve_num = current_bus_vec_.size() - max_proxy_num_;
- uint64_t reserve_num =
- std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num);
+ uint64_t max_reserve_num = current_proxy_vec_.size() - max_proxy_num_;
+ uint64_t reserve_num =
std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num);
if (reserve_num <= 0) {
return;
}
@@ -376,15 +350,12 @@ void SendGroup::InitReserveClient() {
unique_write_lock<read_write_mutex> wtlck(reserve_clients_mutex_);
reserve_clients_.clear();
- for (uint64_t i = current_bus_vec_.size() - reserve_num;
- i < current_bus_vec_.size(); i++) {
- ProxyInfo bus_tmp = current_bus_vec_[i];
- TcpClientTPtrT tcpClientTPtrT =
- std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
+ for (uint64_t i = current_proxy_vec_.size() - reserve_num; i <
current_proxy_vec_.size(); i++) {
+ ProxyInfo tmp_proxy = current_proxy_vec_[i];
+ TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(io_context_,
tmp_proxy.ip(), tmp_proxy.port());
reserve_clients_.push_back(tcpClientTPtrT);
}
- LOG_INFO(
- "InitReserveClient reserve_clients size:" << reserve_clients_.size());
+ LOG_INFO("InitReserveClient reserve_clients size:" <<
reserve_clients_.size());
}
bool SendGroup::UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end)
{
if (begin && end) {
@@ -409,14 +380,13 @@ TcpClientTPtrT SendGroup::GetMaxLoadClient() {
ProxyInfo SendGroup::GetRandomProxy(const std::string &ip, uint32_t port) {
ProxyInfo proxy_info;
- for (auto &it : current_bus_vec_) {
+ for (auto &it : current_proxy_vec_) {
if (it.ip() == ip && it.port() == port) {
continue;
}
bool exist = false;
for (int index = 0; index < reserve_clients_.size(); index++) {
- if (it.ip() == reserve_clients_[index]->getIp() &&
- it.port() == reserve_clients_[index]->getPort()) {
+ if (it.ip() == reserve_clients_[index]->getIp() && it.port() ==
reserve_clients_[index]->getPort()) {
exist = true;
break;
}
@@ -444,8 +414,7 @@ TcpClientTPtrT SendGroup::GetReserveClient() {
}
bool exist = false;
for (int index = 0; index < work_clients_->size(); index++) {
- if (it->getIp() == (*work_clients_)[index]->getIp() &&
- it->getPort() == (*work_clients_)[index]->getPort()) {
+ if (it->getIp() == (*work_clients_)[index]->getIp() && it->getPort() ==
(*work_clients_)[index]->getPort()) {
exist = true;
break;
}
@@ -461,11 +430,10 @@ TcpClientTPtrT SendGroup::GetReserveClient() {
bool SendGroup::ExistInWorkClient(const std::string &ip, uint32_t port) {
unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
for (int index = 0; index < work_clients_->size(); index++) {
- if (ip == (*work_clients_)[index]->getIp() &&
- port == (*work_clients_)[index]->getPort()) {
+ if (ip == (*work_clients_)[index]->getIp() && port ==
(*work_clients_)[index]->getPort()) {
return true;
}
}
return false;
}
-} // namespace inlong
+} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
index a52d63dcd3..c54f858b90 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
@@ -1,53 +1,53 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#ifndef INLONG_SDK_SEND_GROUP_H
#define INLONG_SDK_SEND_GROUP_H
-#include "../client/tcp_client.h"
+#include <queue>
+#include <unordered_map>
+
#include "../config/proxy_info.h"
#include "../utils/send_buffer.h"
-#include <queue>
+#include "../client/tcp_client.h"
+
namespace inlong {
const int kTimerMiSeconds = 10;
const int kTimerMinute = 60000;
using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
using IOContext = asio::io_context;
-using io_context_work =
- asio::executor_work_guard<asio::io_context::executor_type>;
+using io_context_work =
asio::executor_work_guard<asio::io_context::executor_type>;
class SendGroup : noncopyable {
-private:
+ private:
IOContext io_context_;
- io_context_work work_;
+ io_context_work work_; // 保持io_context.run()在无任何任务时不退出
std::thread thread_;
void Run();
uint64_t max_proxy_num_;
-public:
+ public:
std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_;
std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_old_;
std::vector<TcpClientTPtrT> reserve_clients_;
- ProxyInfoVec current_bus_vec_;
- std::queue<SendBufferPtrT> send_buf_list_;
+ ProxyInfoVec current_proxy_vec_;
+ std::queue<SendBufferPtrT> send_proxy_list_;
SteadyTimerPtr send_timer_;
SteadyTimerPtr update_conf_timer_;
@@ -75,11 +75,11 @@ public:
void PreDispatchData(std::error_code error);
void DispatchData(std::error_code error);
bool IsFull();
- uint32_t PushData(SendBufferPtrT send_buffer_ptr);
+ uint32_t PushData(const SendBufferPtrT &send_buffer_ptr);
SendBufferPtrT PopData();
uint32_t GetQueueSize();
void UpdateConf(std::error_code error);
- bool IsConfChanged(ProxyInfoVec ¤t_bus_vec, ProxyInfoVec &new_bus_vec);
+ bool IsConfChanged(ProxyInfoVec ¤t_proxy_vec, ProxyInfoVec
&new_proxy_vec);
bool IsAvailable();
void ClearOldTcpClients();
@@ -96,7 +96,6 @@ public:
bool ExistInWorkClient(const std::string &ip, uint32_t port);
};
using SendGroupPtr = std::shared_ptr<SendGroup>;
+} // namespace inlong
-} // namespace inlong
-
-#endif // INLONG_SDK_SEND_GROUP_H
+#endif // INLONG_SDK_SEND_GROUP_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc
index 28bfed473c..8754d60ab2 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc
@@ -29,9 +29,7 @@ char APIEncode::recvBuf[APIEncode::kRecvLen] = {0};
void APIEncode::decodeProtocoMsg(SendBuffer *buf) {
memset(recvBuf, 0x0, kRecvLen);
- // //LOG_DEBUG("print buf content, %s", buf->content());
- memcpy(recvBuf, buf->content(), buf->len());
- // memcpy(recvBuf, buf->content(), buf->len());
+ memcpy(recvBuf, buf->GetData(), buf->GetDataLen());
char *p = recvBuf;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 47282e0e32..a1753b8255 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -83,6 +83,7 @@ static const uint32_t kTcpDetectionInterval = 60000;
static const uint32_t kMaxRetryIntervalMs= 3000;
static const uint32_t kRetryIntervalMs= 200;
static const int32_t kRetryTimes = 1;
+static const uint32_t kProxyRepeatTimes = 1;
static const char kSerIP[] = "127.0.0.1";
static const uint32_t kSerPort = 46801;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
index 5b7e5a596a..4b2ca778ba 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
@@ -1,123 +1,120 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-#ifndef INLONG_SDK_SEND_BUFFER_H
-#define INLONG_SDK_SEND_BUFFER_H
+#ifndef INLONG_SEND_BUFFER_H
+#define INLONG_SEND_BUFFER_H
#include <mutex>
#include <string>
+#include <deque>
+#include <queue>
+#include "asio.hpp"
#include "atomic.h"
#include "logger.h"
#include "noncopyable.h"
-#include "sdk_msg.h"
-#include <asio.hpp>
-#include <deque>
-#include <queue>
+
+#include "../core/sdk_msg.h"
+#include "../manager/msg_manager.h"
namespace inlong {
-class Connection;
-using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
-using ConnectionPtr = std::shared_ptr<Connection>;
class SendBuffer : noncopyable {
-private:
- uint32_t uniq_id_;
- std::atomic<bool> is_used_;
- std::atomic<bool> is_packed_;
- char *content_;
- uint32_t size_;
- int32_t msg_cnt_;
- uint32_t len_;
+ private:
+ char *data_;
+ uint32_t data_len_;
+ uint32_t msg_cnt_;
std::string inlong_group_id_;
std::string inlong_stream_id_;
- AtomicInt already_send_;
- uint64_t first_send_time_;
- uint64_t latest_send_time_;
std::vector<SdkMsgPtr> user_msg_vector_;
-public:
- SendBuffer(uint32_t size)
- : uniq_id_(0), is_used_(false), is_packed_(false), size_(size),
- msg_cnt_(0), len_(0), inlong_group_id_(), inlong_stream_id_(),
- first_send_time_(0), latest_send_time_(0) {
- content_ = new char[size];
- if (content_) {
- memset(content_, 0x0, size);
+ public:
+ SendBuffer(uint32_t size) : msg_cnt_(0), data_len_(0), inlong_group_id_(),
inlong_stream_id_() {
+ data_ = new char[size];
+ if (data_) {
+ memset(data_, 0x0, size);
}
}
~SendBuffer() {
- if (content_) {
- delete[] content_;
+ if (data_) {
+ delete[] data_;
}
}
-
- char *content() { return content_; }
- int32_t msgCnt() const { return msg_cnt_; }
- void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; }
- uint32_t len() { return len_; }
- void setLen(const uint32_t len) { len_ = len; }
- std::string getGroupId() { return inlong_group_id_; }
- std::string getStreamId() { return inlong_stream_id_; }
- void setInlongGroupId(const std::string &inlong_group_id) {
+ char *GetData() const {
+ return data_;
+ }
+ void SetData(char *data) {
+ data_ = data;
+ }
+ uint32_t GetDataLen() const {
+ return data_len_;
+ }
+ void SetDataLen(uint32_t data_len) {
+ data_len_ = data_len;
+ }
+ uint32_t GetMsgCnt() const {
+ return msg_cnt_;
+ }
+ void SetMsgCnt(uint32_t msg_cnt) {
+ msg_cnt_ = msg_cnt;
+ }
+ const std::string &GetInlongGroupId() const {
+ return inlong_group_id_;
+ }
+ void SetInlongGroupId(const std::string &inlong_group_id) {
inlong_group_id_ = inlong_group_id;
}
- void setStreamId(const std::string &inlong_stream_id) {
+ const std::string &GetInlongStreamId() const {
+ return inlong_stream_id_;
+ }
+ void SetInlongStreamId(const std::string &inlong_stream_id) {
inlong_stream_id_ = inlong_stream_id;
}
- void setUniqId(const uint32_t &uniq_id) { uniq_id_ = uniq_id; }
+ void addUserMsg(const SdkMsgPtr &msg) { user_msg_vector_.push_back(msg); }
- void addUserMsg(SdkMsgPtr msg) { user_msg_vector_.push_back(msg); }
void doUserCallBack() {
for (auto it : user_msg_vector_) {
if (it->cb_) {
- it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
- it->msg_.data(), it->msg_.size(), it->user_report_time_,
- it->user_client_ip_.data());
+ it->cb_(it->inlong_group_id_.data(),
+ it->inlong_stream_id_.data(),
+ it->msg_.data(),
+ it->msg_.size(),
+ it->report_time_,
+ it->client_ip_.data());
}
}
}
void releaseBuf() {
- if (!is_used_) {
- return;
- }
- uniq_id_ = 0;
- is_used_ = false;
- is_packed_ = false;
- memset(content_, 0x0, size_);
msg_cnt_ = 0;
- len_ = 0;
+ data_len_ = 0;
inlong_group_id_ = "";
inlong_stream_id_ = "";
- already_send_.getAndSet(0);
- first_send_time_ = 0;
- latest_send_time_ = 0;
+ for (const auto &it : user_msg_vector_) {
+ if (it->cb_) {
+ it->clear();
+ }
+ }
+ MsgManager::GetInstance()->AddMsg(user_msg_vector_);
user_msg_vector_.clear();
- AtomicInt fail_create_conn_;
- fail_create_conn_.getAndSet(0);
+ user_msg_vector_.shrink_to_fit();
}
-
- void setIsPacked(bool is_packed) { is_packed_ = is_packed; }
};
typedef std::shared_ptr<SendBuffer> SendBufferPtrT;
-typedef std::queue<SendBufferPtrT> SendBufferPtrDeque;
-} // namespace inlong
+} // namespace inlong
-#endif // INLONG_SDK_SEND_BUFFER_H
\ No newline at end of file
+#endif // INLONG_SEND_BUFFER_H
\ No newline at end of file