This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fb395fe0e0 [INLONG-10821][SDK] Optimize the ability to receive data
for DataProxy C++ SDK (#10835)
fb395fe0e0 is described below
commit fb395fe0e0a44c1ade30f084c138aeaf7232e2bf
Author: doleyzi <[email protected]>
AuthorDate: Wed Aug 21 14:01:34 2024 +0800
[INLONG-10821][SDK] Optimize the ability to receive data for DataProxy C++
SDK (#10835)
---
.../dataproxy-sdk-cpp/src/client/tcp_client.cc | 246 +++++++-------
.../dataproxy-sdk-cpp/src/client/tcp_client.h | 71 ++--
.../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 7 +
.../dataproxy-sdk-cpp/src/config/sdk_conf.h | 1 +
.../dataproxy-sdk-cpp/src/core/api_code.h | 3 +-
.../dataproxy-sdk-cpp/src/core/api_imp.cc | 104 +++---
.../dataproxy-sdk-cpp/src/core/api_imp.h | 38 +--
.../dataproxy-sdk-cpp/src/core/sdk_msg.h | 11 +-
.../dataproxy-sdk-cpp/src/group/recv_group.cc | 365 +++++++++++----------
.../dataproxy-sdk-cpp/src/group/recv_group.h | 77 +++--
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 6 +-
.../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 2 +-
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 5 +
.../dataproxy-sdk-cpp/src/utils/send_buffer.h | 4 +-
14 files changed, 511 insertions(+), 429 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index e27910c4e9..eb3264ba6c 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -1,47 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#include "tcp_client.h"
-#include "../utils/utils.h"
-#include "api_code.h"
+
#include <utility>
+#include "../manager/buffer_manager.h"
+#include "../manager/metric_manager.h"
+#include "../utils/utils.h"
+
namespace inlong {
#define CLIENT_INFO client_info_ << "[" << status_ << "]"
TcpClient::TcpClient(IOContext &io_context, std::string ip, uint32_t port)
: socket_(std::make_shared<asio::ip::tcp::socket>(io_context)),
wait_timer_(std::make_shared<asio::steady_timer>(io_context)),
keep_alive_timer_(std::make_shared<asio::steady_timer>(io_context)),
- ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip),
port),
- status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false),
- proxy_loads_(30), wait_heart_beat_(false), reset_client_(false),
- heart_beat_index_(0), only_heart_heat_(false) {
+ ip_(ip),
+ port_(port),
+ endpoint_(asio::ip::address::from_string(ip), port),
+ status_(kUndefined),
+ recv_buf_(new BlockMemory()),
+ exit_(false),
+ proxy_loads_(30),
+ wait_heart_beat_(false),
+ reset_client_(false),
+ heart_beat_index_(0),
+ only_heart_heat_(false),
+ need_retry_(false),
+ retry_times_(0) {
client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";
tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_;
tcp_idle_time_ = SdkConfig::getInstance()->tcp_idle_time_;
last_update_time_ = Utils::getCurrentMsTime();
- keep_alive_timer_->expires_after(
- std::chrono::milliseconds(tcp_detection_interval_));
- keep_alive_timer_->async_wait(
- std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1));
+
keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_));
+ keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this,
std::placeholders::_1));
LOG_INFO("TcpClient At remote info .status:" << status_ << client_info_);
AsyncConnect();
@@ -70,7 +78,7 @@ TcpClient::~TcpClient() {
void TcpClient::DoClose() {
status_ = kStopped;
exit_ = true;
- LOG_INFO("closed client." << CLIENT_INFO);
+ LOG_INFO("Closed client." << CLIENT_INFO);
}
void TcpClient::AsyncConnect() {
@@ -87,13 +95,12 @@ void TcpClient::AsyncConnect() {
}
}
status_ = kConnecting;
- LOG_INFO("began to connect." << CLIENT_INFO);
+ LOG_INFO("Began to connect." << CLIENT_INFO);
} catch (std::exception &e) {
LOG_ERROR("AsyncConnect exception." << e.what() << CLIENT_INFO);
}
- socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this,
- std::placeholders::_1));
+ socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this,
std::placeholders::_1));
}
void TcpClient::DoAsyncConnect(asio::error_code error) {
@@ -116,7 +123,12 @@ void TcpClient::OnConnected(asio::error_code error) {
socket_->set_option(asio::ip::tcp::no_delay(true));
asio::socket_base::keep_alive option(true);
socket_->set_option(option);
- LOG_INFO("client has connected." << CLIENT_INFO);
+ LOG_INFO("Client has connected." << CLIENT_INFO);
+ if (need_retry_) {
+ LOG_WARN("Client has connected retry! times:" << retry_times_ <<
CLIENT_INFO);
+ write(sendBuffer_, true);
+ return;
+ }
status_ = kFree;
return;
}
@@ -124,22 +136,24 @@ void TcpClient::OnConnected(asio::error_code error) {
return;
}
status_ = kConnectFailed;
- LOG_ERROR("connect has error:" << error.message() << CLIENT_INFO);
+ LOG_ERROR("Connect has error:" << error.message() << CLIENT_INFO);
+ if (need_retry_) {
+ ResetSendBuffer();
+ }
wait_timer_->expires_after(std::chrono::milliseconds(kConnectTimeout));
- wait_timer_->async_wait(
- std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1));
+ wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this,
std::placeholders::_1));
}
-void TcpClient::write(SendBufferPtrT sendBuffer) {
+void TcpClient::write(SendBufferPtrT sendBuffer, bool retry) {
if (kStopped == status_ || exit_) {
LOG_ERROR("Stop.At." << CLIENT_INFO);
return;
}
- if (status_ != kFree) {
+ if (status_ != kFree && !retry) {
LOG_WARN("Not free ." << CLIENT_INFO);
return;
}
- sendBuffer_ = sendBuffer;
+ sendBuffer_ = std::move(sendBuffer);
BeginWrite();
}
@@ -150,13 +164,10 @@ void TcpClient::BeginWrite() {
}
last_update_time_ = Utils::getCurrentMsTime();
status_ = kWriting;
- asio::async_write(*socket_,
- asio::buffer(sendBuffer_->content(), sendBuffer_->len()),
- std::bind(&TcpClient::OnWroten, this,
std::placeholders::_1,
- std::placeholders::_2));
+ asio::async_write(*socket_, asio::buffer(sendBuffer_->content(),
sendBuffer_->len()),
+ std::bind(&TcpClient::OnWroten, this,
std::placeholders::_1, std::placeholders::_2));
}
-void TcpClient::OnWroten(const asio::error_code error,
- std::size_t bytes_transferred) {
+void TcpClient::OnWroten(const asio::error_code error, std::size_t
bytes_transferred) {
if (kStopped == status_ || exit_) {
return;
}
@@ -164,14 +175,14 @@ void TcpClient::OnWroten(const asio::error_code error,
if (asio::error::operation_aborted == error) {
return;
}
- LOG_ERROR("write error:" << error.message() << CLIENT_INFO);
+ LOG_ERROR("Write error:" << error.message() << CLIENT_INFO);
status_ = kWriting;
HandleFail();
return;
}
if (0 == bytes_transferred) {
- LOG_ERROR("transferred 0 bytes." << CLIENT_INFO);
+ LOG_ERROR("Transferred 0 bytes." << CLIENT_INFO);
status_ = kWaiting;
HandleFail();
return;
@@ -179,8 +190,7 @@ void TcpClient::OnWroten(const asio::error_code error,
status_ = kClientResponse;
asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)),
- std::bind(&TcpClient::OnReturn, this, std::placeholders::_1,
- std::placeholders::_2));
+ std::bind(&TcpClient::OnReturn, this,
std::placeholders::_1, std::placeholders::_2));
}
void TcpClient::OnReturn(asio::error_code error, std::size_t len) {
if (kStopped == status_ || exit_) {
@@ -201,18 +211,15 @@ void TcpClient::OnReturn(asio::error_code error,
std::size_t len) {
HandleFail();
return;
}
- size_t resp_len =
- ntohl(*reinterpret_cast<const uint32_t *>(recv_buf_->m_data));
+ size_t resp_len = ntohl(*reinterpret_cast<const uint32_t
*>(recv_buf_->m_data));
if (resp_len > recv_buf_->m_max_size) {
status_ = kWaiting;
HandleFail();
return;
}
- asio::async_read(*socket_,
- asio::buffer(recv_buf_->m_data + sizeof(uint32_t),
resp_len),
- std::bind(&TcpClient::OnBody, this, std::placeholders::_1,
- std::placeholders::_2));
+ asio::async_read(*socket_, asio::buffer(recv_buf_->m_data +
sizeof(uint32_t), resp_len),
+ std::bind(&TcpClient::OnBody, this, std::placeholders::_1,
std::placeholders::_2));
}
void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) {
@@ -230,17 +237,19 @@ void TcpClient::OnBody(asio::error_code error, size_t
bytesTransferred) {
return;
}
uint32_t parse_index = sizeof(uint32_t);
- uint8_t msg_type =
- *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
+ uint8_t msg_type = *reinterpret_cast<const uint8_t *>(recv_buf_->m_data +
parse_index);
switch (msg_type) {
- case 8:
- ParseHeartBeat(bytesTransferred);
- break;
- default:
- ParseGenericResponse();
- break;
+ case 8:
+ ParseHeartBeat(bytesTransferred);
+ break;
+ default:
+ ParseGenericResponse();
+ break;
}
+
+ ResetRetry();
+
if (wait_heart_beat_) {
HeartBeat();
wait_heart_beat_ = false;
@@ -261,17 +270,12 @@ void TcpClient::HandleFail() {
}
status_ = kConnecting;
- if (sendBuffer_ != nullptr) {
- stat_.AddSendFailMsgNum(sendBuffer_->msgCnt());
- stat_.AddSendFailPackNum(1);
+ ResetSendBuffer();
- stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+ int retry_interval = std::min(retry_times_ * constants::kRetryIntervalMs,
constants::kMaxRetryIntervalMs);
- sendBuffer_->doUserCallBack();
- sendBuffer_->releaseBuf();
- }
-
- AsyncConnect();
+ wait_timer_->expires_after(std::chrono::milliseconds(retry_interval));
+ wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this,
std::placeholders::_1));
}
void TcpClient::DetectStatus(const asio::error_code error) {
@@ -282,23 +286,28 @@ void TcpClient::DetectStatus(const asio::error_code
error) {
return;
}
if (!only_heart_heat_) {
- LOG_INFO(stat_.ToString() << CLIENT_INFO);
- stat_.ResetStat();
+ UpdateMetric();
}
- if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ &&
- status_ != kConnecting) {
+ if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ &&
status_ != kConnecting) {
std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
- LOG_INFO("reconnect because it has idle "
- << tcp_idle_time_ << " ms."
- << "last send time:" << last_update_time_ << CLIENT_INFO);
+ LOG_INFO("Reconnect because it has idle " << tcp_idle_time_ << " ms."
+ << "last send time:" <<
last_update_time_ << CLIENT_INFO);
AsyncConnect();
}
- keep_alive_timer_->expires_after(
- std::chrono::milliseconds(tcp_detection_interval_));
- keep_alive_timer_->async_wait(
- std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1));
+
keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_));
+ keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this,
std::placeholders::_1));
+}
+
+void TcpClient::UpdateMetric() {
+ Metric stat;
+ for (auto &it : stat_map_) {
+ MetricManager::GetInstance()->UpdateMetric(it.first, it.second);
+ stat.Update(it.second);
+ it.second.ResetStat();
+ }
+ LOG_INFO(stat.ToString() << CLIENT_INFO);
}
void TcpClient::HeartBeat(bool only_heart_heat) {
@@ -308,12 +317,10 @@ void TcpClient::HeartBeat(bool only_heart_heat) {
only_heart_heat_ = only_heart_heat;
status_ = kHeartBeat;
last_update_time_ = Utils::getCurrentMsTime();
- // status_ = kWriting;
bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4);
bin_hb_.msg_type = 8;
- bin_hb_.data_time =
- htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000));
+ bin_hb_.data_time = htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() /
1000));
bin_hb_.body_ver = 1;
bin_hb_.body_len = 0;
bin_hb_.attr_len = 0;
@@ -322,58 +329,52 @@ void TcpClient::HeartBeat(bool only_heart_heat) {
uint32_t hb_len = sizeof(bin_hb_);
asio::async_write(*socket_, asio::buffer(hb, hb_len),
- std::bind(&TcpClient::OnWroten, this,
std::placeholders::_1,
- std::placeholders::_2));
+ std::bind(&TcpClient::OnWroten, this,
std::placeholders::_1, std::placeholders::_2));
}
void TcpClient::ParseHeartBeat(size_t total_length) {
- // | total length(4) | msg type(1) | data time(4) | body version(1) | body
- // length (4) | body | attr length(2) | attr | magic (2) | skip total length
+ // | total length(4) | msg type(1) | data time(4) | body version(1) | body
length (4) | body | attr length(2) | attr
+ // | magic (2) |
+ // skip total length
uint32_t parse_index = sizeof(uint32_t);
// skip msg type
parse_index += sizeof(uint8_t);
// skip data time
- // uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t
- // *>(recv_buf_->m_data + parse_index));
+ // uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t
*>(recv_buf_->m_data + parse_index));
parse_index += sizeof(uint32_t);
// 3、parse body version
- uint32_t body_version =
- *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
+ uint32_t body_version = *reinterpret_cast<const uint8_t *>(recv_buf_->m_data
+ parse_index);
parse_index += sizeof(uint8_t);
// 4、parse body length
- uint32_t body_length = ntohl(
- *reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index));
+ uint32_t body_length = ntohl(*reinterpret_cast<const uint32_t
*>(recv_buf_->m_data + parse_index));
parse_index += sizeof(uint32_t);
// 5 parse load
- int16_t load = ntohs(
- *reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index));
+ int16_t load = ntohs(*reinterpret_cast<const int16_t *>(recv_buf_->m_data +
parse_index));
parse_index += sizeof(int16_t);
// 7 parse attr length
- uint16_t attr_length = ntohs(
- *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+ uint16_t attr_length = ntohs(*reinterpret_cast<const uint16_t
*>(recv_buf_->m_data + parse_index));
parse_index += sizeof(uint16_t);
// 8 skip attr
parse_index += attr_length;
// 9 parse magic
- uint16_t magic = ntohs(
- *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+ uint16_t magic = ntohs(*reinterpret_cast<const uint16_t *>(recv_buf_->m_data
+ parse_index));
parse_index += sizeof(uint16_t);
if (magic != constants::kBinaryMagic) {
- LOG_ERROR("failed to parse heartbeat ack! error magic "
- << magic << " !=" << constants::kBinaryMagic << CLIENT_INFO);
+ LOG_ERROR("Failed to ParseMsg heartbeat ack! error magic " << magic << "
!=" << constants::kBinaryMagic
+ << CLIENT_INFO);
return;
}
if (total_length + 4 != parse_index) {
- LOG_ERROR("failed to parse heartbeat ack! total_length "
- << total_length << " +4 !=" << parse_index << CLIENT_INFO);
+ LOG_ERROR("Failed to ParseMsg heartbeat ack! total_length " <<
total_length << " +4 !=" << parse_index
+ <<
CLIENT_INFO);
return;
}
if (heart_beat_index_ > constants::MAX_STAT) {
@@ -389,12 +390,12 @@ void TcpClient::ParseHeartBeat(size_t total_length) {
void TcpClient::ParseGenericResponse() {
if (sendBuffer_ != nullptr) {
- stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
- stat_.AddSendSuccessPackNum(1);
-
- stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+ std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner +
sendBuffer_->getStreamId();
+ stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->msgCnt());
+ stat_map_[stat_key].AddSendSuccessPackNum(1);
+ stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() -
last_update_time_);
- sendBuffer_->releaseBuf();
+ BufferManager::GetInstance()->AddSendBuffer(sendBuffer_);
}
}
@@ -419,8 +420,7 @@ int32_t TcpClient::GetAvgLoad() {
void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; }
void TcpClient::UpdateClient(const std::string ip, const uint32_t port) {
- LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port
- << "] replace" << CLIENT_INFO);
+ LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port <<
"] replace" << CLIENT_INFO);
ip_ = ip;
port_ = port;
reset_client_ = true;
@@ -438,5 +438,29 @@ void TcpClient::RestClient() {
const std::string &TcpClient::getIp() const { return ip_; }
const std::string &TcpClient::getClientInfo() const { return client_info_; }
uint32_t TcpClient::getPort() const { return port_; }
+void TcpClient::ResetRetry() {
+ need_retry_ = false;
+ retry_times_ = 0;
+}
+void TcpClient::ResetSendBuffer() {
+ if (sendBuffer_ == nullptr) {
+ return;
+ }
+ retry_times_++;
+ if (retry_times_ > SdkConfig::getInstance()->retry_times_) {
+ std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner +
sendBuffer_->getStreamId();
+ stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->msgCnt());
+ stat_map_[stat_key].AddSendFailPackNum(1);
+ stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() -
last_update_time_);
+
+ sendBuffer_->doUserCallBack();
+
+ BufferManager::GetInstance()->AddSendBuffer(sendBuffer_);
-} // namespace inlong
+ LOG_INFO("resend to proxy fail! retry times:" << retry_times_ <<
CLIENT_INFO);
+ ResetRetry();
+ } else {
+ need_retry_ = true;
+ }
+}
+} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
index 6835a7fd26..f2a92fc459 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-#ifndef INLONG_SDK_TCP_CLIENT_H
-#define INLONG_SDK_TCP_CLIENT_H
+#ifndef INLONG_TCP_CLIENT_H
+#define INLONG_TCP_CLIENT_H
+#include <queue>
+#include <unordered_map>
+
+#include "../metric/metric.h"
+#include "../protocol/msg_protocol.h"
#include "../utils/block_memory.h"
#include "../utils/capi_constant.h"
#include "../utils/read_write_mutex.h"
#include "../utils/send_buffer.h"
-#include "msg_protocol.h"
-#include "stat.h"
-#include <queue>
namespace inlong {
enum ClientStatus {
@@ -46,37 +46,38 @@ enum {
};
using IOContext = asio::io_context;
using TcpSocketPtr = std::shared_ptr<asio::ip::tcp::socket>;
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
class TcpClient {
-private:
+ private:
TcpSocketPtr socket_;
SteadyTimerPtr wait_timer_;
SteadyTimerPtr keep_alive_timer_;
ClientStatus status_;
std::string ip_;
-public:
+ public:
const std::string &getIp() const;
-private:
+ private:
uint32_t port_;
-public:
+ public:
uint32_t getPort() const;
-private:
+ private:
std::string client_info_;
-public:
+ public:
const std::string &getClientInfo() const;
-private:
+ private:
std::shared_ptr<SendBuffer> sendBuffer_;
asio::ip::tcp::endpoint endpoint_;
BlockMemoryPtrT recv_buf_;
uint64_t tcp_idle_time_;
uint32_t tcp_detection_interval_;
uint64_t last_update_time_;
- Stat stat_;
+ Metric stat_;
bool exit_;
BinaryHB bin_hb_ = {0};
std::vector<int32_t> proxy_loads_;
@@ -84,8 +85,11 @@ private:
bool wait_heart_beat_;
bool reset_client_;
volatile bool only_heart_heat_;
+ bool need_retry_;
+ uint32_t retry_times_;
+ std::unordered_map <std::string ,Metric> stat_map_;
-public:
+ public:
TcpClient(IOContext &io_context, std::string ip, uint32_t port);
~TcpClient();
void AsyncConnect();
@@ -98,7 +102,7 @@ public:
void DoClose();
void HandleFail();
bool isFree() { return (status_ == kFree); }
- void write(SendBufferPtrT sendBuffer);
+ void write(SendBufferPtrT sendBuffer, bool retry = false);
void DetectStatus(const asio::error_code error);
void HeartBeat(bool only_heart_heat = false);
void SetHeartBeatStatus();
@@ -107,10 +111,13 @@ public:
void UpdateClient(const std::string ip, const uint32_t port);
void RestClient();
int32_t GetAvgLoad();
+ void ResetRetry();
+ void ResetSendBuffer();
+ void UpdateMetric();
};
typedef std::shared_ptr<TcpClient> TcpClientTPtrT;
typedef std::vector<TcpClientTPtrT> TcpClientTPtrVecT;
typedef TcpClientTPtrVecT::iterator TcpClientTPtrVecItT;
-} // namespace inlong
+} // namespace inlong
-#endif // INLONG_SDK_TCP_CLIENT_H
+#endif // INLONG_TCP_CLIENT_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 4240a836dc..12a82b07e7 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -436,6 +436,13 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
} else {
enable_balance_ = constants::kEnableBalance;
}
+
+ if (doc.HasMember("retry_times") && doc["retry_times"].IsInt() &&
doc["retry_times"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["retry_times"];
+ retry_times_ = obj.GetInt();
+ } else {
+ retry_times_ = constants::kRetryTimes;
+ }
}
void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
// auth settings
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index 3a649cfb82..ae6cfde58b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -95,6 +95,7 @@ private:
uint32_t tcp_detection_interval_; // tcp-client detection interval
bool enable_balance_;
bool enable_local_cache_;
+ uint32_t retry_times_;
// auth settings
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h
index 0228f9c96d..f6413717dd 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h
@@ -41,7 +41,8 @@ enum SdkCode {
kSendBeforeInit = 22,
kFailMallocBuf = 23,
kMsgSizeLargerThanPackSize = 24,
- kSendBufferFull = 25
+ kSendBufferFull = 25,
+ kBufferManagerFull = 26
};
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 13ce7223b5..7ae2c79ea2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -16,19 +16,20 @@
*/
#include "api_imp.h"
+#include <signal.h>
+#include <iostream>
#include "../manager/proxy_manager.h"
+#include "../manager/metric_manager.h"
#include "../utils/capi_constant.h"
#include "../utils/logger.h"
#include "../utils/utils.h"
-
-#include "api_code.h"
-#include <iostream>
-#include <signal.h>
-
-#include "metric_manager.h"
+#include "../core/api_code.h"
namespace inlong {
int32_t ApiImp::InitApi(const char *config_file_path) {
+ if (config_file_path == nullptr) {
+ return SdkCode::kErrorInit;
+ }
if (!__sync_bool_compare_and_swap(&inited_, false, true)) {
return SdkCode::kMultiInit;
}
@@ -36,56 +37,65 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
user_exit_flag_.getAndSet(0);
if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) {
- LOG_ERROR("ParseConfig error ");
return SdkCode::kErrorInit;
}
- max_msg_length_ = std::min(SdkConfig::getInstance()->max_msg_size_,
- SdkConfig::getInstance()->pack_size_) -
constants::ATTR_LENGTH;
+
+ max_msg_length_ = SdkConfig::getInstance()->max_msg_size_ -
constants::ATTR_LENGTH;
local_ip_ = SdkConfig::getInstance()->local_ip_;
return DoInit();
}
-int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id,
- const char *msg, int32_t msg_len, UserCallBack call_back)
{
+int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char
*msg, int32_t msg_len,
+ UserCallBack call_back) {
+ int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+ if(code !=SdkCode::kSuccess){
+ return code;
+ }
+
+ return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back);
+}
+int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char
*msg, int32_t msg_len,
+ int64_t data_time, UserCallBack call_back) {
+ int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+ if(code !=SdkCode::kSuccess){
+ return code;
+ }
+
+ return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back,
data_time);
+}
+
+int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id,
const char *msg, int32_t msg_len){
if (msg_len > max_msg_length_) {
+ MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
return SdkCode::kMsgTooLong;
}
- if (inlong_group_id == nullptr || inlong_stream_id == nullptr ||
- msg == nullptr || msg_len <= 0) {
+ if (group_id == nullptr || stream_id == nullptr || msg == nullptr || msg_len
<= 0) {
return SdkCode::kInvalidInput;
}
if (inited_ == false) {
return SdkCode::kSendBeforeInit;
}
-
- int64_t msg_time = Utils::getCurrentMsTime();
- return this->SendBase(inlong_group_id, inlong_stream_id, local_ip_, msg_time,
- {msg, msg_len}, call_back);
+ return SdkCode::kSuccess;
}
-int32_t ApiImp::SendBase(const std::string inlong_group_id,
- const std::string inlong_stream_id,
- const std::string client_ip, int64_t report_time,
- const std::string msg, UserCallBack call_back) {
- int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg);
+int32_t ApiImp::SendBase(const std::string& inlong_group_id, const
std::string& stream_id, const std::string& msg,
+ UserCallBack call_back, int64_t report_time) {
+ int32_t check_ret = CheckData(inlong_group_id, stream_id, msg);
if (check_ret != SdkCode::kSuccess) {
return check_ret;
}
- ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true);
+ ProxyManager::GetInstance()->CheckGroupIdConf(inlong_group_id, true);
- auto recv_group =
- recv_manager_->GetRecvGroup(inlong_group_id);
+ auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id);
if (recv_group == nullptr) {
- LOG_ERROR("fail to get recv group, inlong_group_id:"
- << inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
+ LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id <<
",getStreamId:" << stream_id);
return SdkCode::kFailGetRevGroup;
}
- return recv_group->SendData(msg, inlong_group_id, inlong_stream_id,
client_ip,
- report_time, call_back);
+ return recv_group->SendData(msg, inlong_group_id, stream_id, report_time,
call_back);
}
int32_t ApiImp::CloseApi(int32_t max_waitms) {
@@ -98,44 +108,38 @@ int32_t ApiImp::CloseApi(int32_t max_waitms) {
}
int32_t ApiImp::DoInit() {
- LOG_INFO(
- "inlong dataproxy sdk cpp start Init, version:" << constants::kVersion);
+ LOG_INFO("tdbus sdk cpp start Init, version:" << constants::kVersion);
signal(SIGPIPE, SIG_IGN);
- LOG_INFO("inlong dataproxy cpp sdk Init complete!");
+ LOG_INFO("tdbus_sdk_cpp Init complete!");
ProxyManager::GetInstance()->Init();
MetricManager::GetInstance()->Init();
for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++)
{
- LOG_INFO("DoInit CheckConf inlong_group_id:"
- << SdkConfig::getInstance()->inlong_group_ids_[i]);
- ProxyManager::GetInstance()->CheckBidConf(
- SdkConfig::getInstance()->inlong_group_ids_[i], false);
+ LOG_INFO("Do init:" << SdkConfig::getInstance()->inlong_group_ids_[i]);
+
ProxyManager::GetInstance()->CheckGroupIdConf(SdkConfig::getInstance()->inlong_group_ids_[i],
false);
}
return InitManager();
}
-int32_t ApiImp::CheckData(const std::string inlong_group_id,
- const std::string inlong_stream_id,
- const std::string msg) {
+int32_t ApiImp::CheckData(const std::string& group_id, const std::string&
stream_id, const std::string& msg) {
if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
LOG_ERROR("capi has been closed, Init first and then send");
return SdkCode::kSendAfterClose;
}
- if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) {
- LOG_ERROR("invalid input, inlong_group_id"
- << inlong_group_id << " inlong_stream_id" << inlong_stream_id);
+ if (msg.empty() || group_id.empty() || stream_id.empty()) {
+ LOG_ERROR("invalid input, group id:" << group_id << " stream id:" <<
stream_id << "msg" << msg);
return SdkCode::kInvalidInput;
}
if (msg.size() > SdkConfig::getInstance()->max_msg_size_) {
- LOG_ERROR("msg len is too long, cur msg_len"
- << msg.size() << " ext_pack_size"
- << SdkConfig::getInstance()->max_msg_size_);
+ MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
+ LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << "
ext_pack_size"
+ <<
SdkConfig::getInstance()->max_msg_size_);
return SdkCode::kMsgTooLong;
}
@@ -156,15 +160,15 @@ int32_t ApiImp::InitManager() {
init_succeed_ = true;
return SdkCode::kSuccess;
}
-int32_t
-ApiImp::AddInLongGroupId(const std::vector<std::string> &inlong_group_ids) {
+int32_t ApiImp::AddInLongGroupId(const std::vector<std::string> &group_ids) {
if (inited_ == false) {
return SdkCode::kSendBeforeInit;
}
- for (auto inlong_group_id : inlong_group_ids) {
- ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, false);
+ for (auto group_id : group_ids) {
+ ProxyManager::GetInstance()->CheckGroupIdConf(group_id, false);
}
+ return SdkCode::kSuccess;
}
ApiImp::ApiImp() = default;
ApiImp::~ApiImp() = default;
-} // namespace inlong
\ No newline at end of file
+} // namespace inlong
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
index c6e6ecd0e4..32ca093416 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
@@ -15,45 +15,43 @@
* limitations under the License.
*/
-#ifndef INLONG_SDK_API_IMP_H
-#define INLONG_SDK_API_IMP_H
+#ifndef INLONG_API_IMP_H
+#define INLONG_API_IMP_H
+#include <cstdint>
+#include <functional>
#include "../config/sdk_conf.h"
#include "../manager/recv_manager.h"
#include "../manager/send_manager.h"
#include "../utils/atomic.h"
-#include <cstdint>
-#include <functional>
namespace inlong {
-
-typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t,
- const int64_t, const char *);
+typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t,
const int64_t, const char *);
class ApiImp {
-public:
+ public:
ApiImp();
~ApiImp();
int32_t InitApi(const char *config_file_path);
- int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
- const char *msg, int32_t msg_len,
+ int32_t Send(const char *group_id, const char *stream_id, const char *msg,
int32_t msg_len,
UserCallBack call_back = nullptr);
+ int32_t Send(const char *group_id, const char *stream_id, const char *msg,
int32_t msg_len,
+ int64_t report_time, UserCallBack call_back = nullptr);
int32_t CloseApi(int32_t max_waitms);
- int32_t AddInLongGroupId(const std::vector<std::string> &inlong_group_ids);
+ int32_t AddInLongGroupId(const std::vector<std::string> &group_ids);
-private:
+ private:
int32_t DoInit();
int32_t InitManager();
- int32_t SendBase(const std::string inlong_group_id,
- const std::string inlong_stream_id,
- const std::string client_ip, int64_t report_time,
- const std::string msg, UserCallBack call_back);
+ int32_t SendBase(const std::string& inlong_group_id, const std::string&
stream_id, const std::string& msg, UserCallBack call_back,
+ int64_t report_time = 0);
+
+ int32_t CheckData(const std::string& group_id, const std::string& stream_id,
const std::string& msg);
- int32_t CheckData(const std::string inlong_group_id,
- const std::string inlong_stream_id, const std::string msg);
+ int32_t ValidateParams(const char *group_id, const char *stream_id, const
char *msg, int32_t msg_len);
AtomicInt user_exit_flag_{0};
volatile bool init_flag_ = false;
@@ -67,5 +65,5 @@ private:
std::shared_ptr<SendManager> send_manager_;
};
-} // namespace inlong
-#endif // INLONG_SDK_API_IMP_H
+} // namespace inlong
+#endif // INLONG_API_IMP_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
index 8ae749a352..d9e8366486 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
@@ -48,7 +48,16 @@ struct SdkMsg {
data_pack_format_attr_(attr),
inlong_group_id_(inlong_group_id),
inlong_stream_id_(inlong_stream_id){}
- SdkMsg() {};
+ SdkMsg(){};
+ void setMsg(const std::string &msg) { msg_ = msg; }
+ void setClientIp(const std::string &clientIp) { client_ip_ = clientIp; }
+ void setReportTime(uint64_t reportTime) { report_time_ = reportTime; }
+ void setCb(UserCallBack cb) { cb_ = cb;}
+ void setUserReportTime(uint64_t userReportTime) { user_report_time_ =
userReportTime; }
+ void setUserClientIp(const std::string &userClientIp) { user_client_ip_ =
userClientIp; }
+ void setDataPackFormatAttr(const std::string &dataPackFormatAttr) {
data_pack_format_attr_ = dataPackFormatAttr; }
+ void setGroupId(const std::string &inlong_group_id) { inlong_group_id_ =
inlong_group_id; }
+ void setStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ =
inlong_stream_id; }
};
using SdkMsgPtr = std::shared_ptr<SdkMsg>;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index b852095f68..377432a2ef 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -1,49 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
#include "recv_group.h"
-#include "../protocol/msg_protocol.h"
-#include "../utils/capi_constant.h"
-#include "../utils/utils.h"
-#include "api_code.h"
-#include <cstdlib>
#include <functional>
+#include "../core/api_code.h"
+#include "../manager/buffer_manager.h"
+#include "../utils/utils.h"
+#include "../manager/msg_manager.h"
+#include "../manager/metric_manager.h"
namespace inlong {
const uint32_t DEFAULT_PACK_ATTR = 400;
-const uint64_t LOG_SAMPLE=100;
-RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager>
send_manager)
- : cur_len_(0), groupId_num_(0), streamId_num_(0),
+const uint64_t LOG_SAMPLE = 100;
+RecvGroup::RecvGroup(const std::string &group_key,
std::shared_ptr<SendManager> send_manager)
+ : cur_len_(0),
+ group_key_(group_key),
+ groupId_num_(0),
+ streamId_num_(0),
msg_type_(SdkConfig::getInstance()->msg_type_),
- data_capacity_(SdkConfig::getInstance()->buf_size_),
- send_manager_(send_manager),group_key_(group_key),
- log_stat_(0){
- data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
- SdkConfig::getInstance()->pack_size_);
+ data_capacity_(SdkConfig::getInstance()->recv_buf_size_),
+ send_manager_(send_manager),
+ log_stat_(0),
+ send_group_(nullptr),
+ max_msg_size_(0),
+ uniq_id_(0) {
+ data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR;
pack_buf_ = new char[data_capacity_];
memset(pack_buf_, 0x0, data_capacity_);
- data_time_ = 0;
+ data_time_ = Utils::getCurrentMsTime();
last_pack_time_ = Utils::getCurrentMsTime();
max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;
-
-
LOG_INFO("RecvGroup:"<<group_key_<<",data_capacity:"<<data_capacity_<<",max_recv_size:"<<max_recv_size_);
+ local_ip_ = SdkConfig::getInstance()->local_ip_;
+ LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_
<< ",max_recv_size:" << max_recv_size_);
}
RecvGroup::~RecvGroup() {
@@ -53,136 +60,108 @@ RecvGroup::~RecvGroup() {
}
}
-int32_t RecvGroup::SendData(const std::string &msg, const std::string &groupId,
- const std::string &streamId,
- const std::string &client_ip, uint64_t report_time,
- UserCallBack call_back) {
+int32_t RecvGroup::SendData(const std::string &msg, const std::string
&inlong_group_id_, const std::string &inlong_stream_id_,
+ uint64_t report_time, UserCallBack call_back) {
std::lock_guard<std::mutex> lck(mutex_);
-
if (msg.size() + cur_len_ > max_recv_size_) {
+
MetricManager::GetInstance()->AddReceiveBufferFullCount(inlong_group_id_,inlong_stream_id_,1);
return SdkCode::kRecvBufferFull;
}
- AddMsg(msg, client_ip, report_time, call_back,groupId,streamId);
-
- return SdkCode::kSuccess;
-}
+ uint64_t data_time = (report_time == 0) ? data_time_ : report_time;
-int32_t RecvGroup::DoDispatchMsg() {
- last_pack_time_ = Utils::getCurrentMsTime();
- std::lock_guard<std::mutex> lck(mutex_);
- if (group_key_.empty()) {
- if (log_stat_++ > LOG_SAMPLE) {
- LOG_ERROR("groupId is empty, check!!");
- log_stat_ = 0;
- }
- return SdkCode::kInvalidInput;
- }
- if (msgs_.empty()) {
- if (log_stat_++ > LOG_SAMPLE) {
- LOG_ERROR("no msg in msg_set, check!");
- log_stat_ = 0;
- }
- return SdkCode::kMsgEmpty;
- }
- auto send_group = send_manager_->GetSendGroup(group_key_);
- if (send_group == nullptr) {
- if (log_stat_++ > LOG_SAMPLE) {
- LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
- log_stat_ = 0;
- }
- return SdkCode::kFailGetSendBuf;
- }
- if (!send_group->IsAvailable()) {
- if (log_stat_++ > LOG_SAMPLE) {
- LOG_ERROR("failed to get send group! group_key:"
- << group_key_ << " send group is not available!");
- log_stat_ = 0;
- }
- return SdkCode::kFailGetConn;
+ std::string data_pack_format_attr =
+ "__addcol1__reptime=" + Utils::getFormatTime(data_time) +
"&__addcol2__ip=" + local_ip_;
+ max_msg_size_ = std::max(max_msg_size_, msg.size());
+ auto it = recv_queue_.find(inlong_group_id_ + inlong_stream_id_);
+ if (it == recv_queue_.end()) {
+ std::queue<SdkMsgPtr> tmp;
+ it = recv_queue_.insert(recv_queue_.begin(),
std::make_pair(inlong_group_id_ + inlong_stream_id_, tmp));
}
- if (send_group->IsFull()) {
- if (log_stat_++ > LOG_SAMPLE) {
- LOG_ERROR("failed to get send group! group_key:"
- << group_key_ << " send group is full!");
- log_stat_ = 0;
- }
- return SdkCode::kSendBufferFull;
+ SdkMsgPtr msg_ptr = MsgManager::GetInstance()->GetMsg();
+ if(nullptr == msg_ptr){
+ it->second.emplace(std::make_shared<SdkMsg>(msg, local_ip_, data_time,
call_back, data_pack_format_attr, local_ip_, data_time, inlong_group_id_,
inlong_stream_id_));
+ }else{
+ msg_ptr->setMsg(msg);
+ msg_ptr->setClientIp(local_ip_);
+ msg_ptr->setReportTime(data_time);
+ msg_ptr->setCb(call_back);
+ msg_ptr->setDataPackFormatAttr(data_pack_format_attr);
+ msg_ptr->setUserClientIp(local_ip_);
+ msg_ptr->setUserReportTime(data_time);
+ msg_ptr->setGroupId(inlong_group_id_);
+ msg_ptr->setStreamId(inlong_stream_id_);
+ it->second.emplace(msg_ptr);
}
- uint32_t total_length = 0;
- uint64_t max_tid_size = 0;
- std::unordered_map<std::string, std::vector<SdkMsgPtr>> msgs_to_dispatch;
- std::unordered_map<std::string, uint64_t> tid_stat;
- while (!msgs_.empty()) {
- SdkMsgPtr msg = msgs_.front();
- if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH >
SdkConfig::getInstance()->pack_size_) {
- if (!msgs_to_dispatch.empty()) {
- break;
- }
- }
- std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_;
- msgs_to_dispatch[msg_key].push_back(msg);
- msgs_.pop();
-
- total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;
+ cur_len_ = cur_len_ + msg.size() + constants::ATTR_LENGTH;
- if (tid_stat.find(msg_key) == tid_stat.end()) {
- tid_stat[msg_key] = 0;
- }
- tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() +
constants::ATTR_LENGTH;
+ return SdkCode::kSuccess;
+}
- max_tid_size = std::max(tid_stat[msg_key], max_tid_size);
+void RecvGroup::DoDispatchMsg() {
+ if (!CanDispatch()) {
+ return;
}
-
- cur_len_ = cur_len_ - total_length;
-
- for (auto it : msgs_to_dispatch) {
- std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(it.second);
-
- ResetPackBuf();
-
- if (send_buffer == nullptr) {
- CallbalkToUsr(it.second);
- continue;
- }
-
- int ret = send_group->PushData(send_buffer);
- if (ret != SdkCode::kSuccess) {
- CallbalkToUsr(it.second);
+ last_pack_time_ = Utils::getCurrentMsTime();
+ data_time_ = last_pack_time_;
+ while (!fail_queue_.empty()) {
+ SendBufferPtrT tmp_ptr = fail_queue_.front();
+ if (SdkCode::kSuccess != send_group_->PushData(tmp_ptr)) {
+ return;
}
+ fail_queue_.pop();
}
- return SdkCode::kSuccess;
-}
-
-void RecvGroup::AddMsg(const std::string &msg, std::string client_ip,
- int64_t report_time, UserCallBack call_back,const
std::string &groupId,
- const std::string &streamId) {
- if (Utils::isLegalTime(report_time))
- data_time_ = report_time;
- else {
- data_time_ = Utils::getCurrentMsTime();
+ {
+ std::lock_guard<std::mutex> lck(mutex_);
+ recv_queue_.swap(dispatch_queue_);
}
- std::string user_client_ip = client_ip;
- int64_t user_report_time = report_time;
+ for (auto &it : dispatch_queue_) {
+ std::vector<SdkMsgPtr> msg_vec;
+ uint64_t msg_size = 0;
+ while (!it.second.empty()) {
+ SdkMsgPtr msg = it.second.front();
+ msg_vec.push_back(msg);
+ msg_size = msg_size + msg->msg_.size() + constants::ATTR_LENGTH;
+ it.second.pop();
+
+ if ((msg_size + max_msg_size_) >= SdkConfig::getInstance()->pack_size_) {
+ uint32_t ret = ParseMsg(msg_vec);
+ if (SdkCode::kBufferManagerFull == ret) {
+ for (const auto &it_msg : msg_vec) {
+ it.second.emplace(it_msg);
+ }
+ return;
+ }
+ UpdateCurrentMsgLen(msg_size);
+ msg_size = 0;
+
+ if (SdkCode::kSuccess != ret) {
+ return;
+ }
+ std::vector<SdkMsgPtr>().swap(msg_vec);
+ }
+ }
+ if (!msg_vec.empty()) {
+ uint32_t ret = ParseMsg(msg_vec);
+ if (SdkCode::kBufferManagerFull == ret) {
+ for (const auto &it_msg : msg_vec) {
+ it.second.emplace(it_msg);
+ }
+ return;
+ }
+ UpdateCurrentMsgLen(msg_size);
- if (client_ip.empty()) {
- client_ip = "127.0.0.1";
+ if (SdkCode::kSuccess != ret) {
+ return;
+ }
+ }
}
- std::string data_pack_format_attr =
- "__addcol1__reptime=" + Utils::getFormatTime(data_time_) +
- "&__addcol2__ip=" + client_ip;
- msgs_.push(std::make_shared<SdkMsg>(msg, client_ip, data_time_, call_back,
- data_pack_format_attr, user_client_ip,
- user_report_time,groupId,streamId));
-
- cur_len_ += msg.size() + constants::ATTR_LENGTH;
}
-bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,
- uint32_t &out_len, uint32_t uniq_id) {
+bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t
&out_len) {
if (pack_data == nullptr) {
LOG_ERROR("nullptr, failed to allocate memory for buf");
return false;
@@ -196,7 +175,6 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,
memcpy(&pack_buf_[idx], it->msg_.data(), it->msg_.size());
idx += static_cast<uint32_t>(it->msg_.size());
- // add attrlen|attr
if (SdkConfig::getInstance()->isAttrDataPackFormat()) {
*(uint32_t *)(&pack_buf_[idx]) =
htonl(it->data_pack_format_attr_.size());
idx += sizeof(uint32_t);
@@ -247,7 +225,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,
groupId_num = 0;
streamId_num = 0;
groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
- "&streamId=" + msgs[0]->inlong_stream_id_;
+ "&streamId=" + msgs[0]->inlong_stream_id_;
char_groupId_flag = 0x4;
} else {
groupId_num = groupId_num_;
@@ -261,14 +239,14 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs,
char *pack_data,
if (SdkConfig::getInstance()->enableTraceIP()) {
if (groupId_streamId_char.empty())
attr = "node1ip=" + SdkConfig::getInstance()->local_ip_ +
- "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+ "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
else
attr = groupId_streamId_char +
- "&node1ip=" + SdkConfig::getInstance()->local_ip_ +
- "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+ "&node1ip=" + SdkConfig::getInstance()->local_ip_ +
+ "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
} else {
attr = "groupId=" + msgs[0]->inlong_group_id_ +
- "&streamId=" + msgs[0]->inlong_stream_id_;
+ "&streamId=" + msgs[0]->inlong_stream_id_;
}
*(uint16_t *)bodyBegin = htons(attr.size());
bodyBegin += sizeof(uint16_t);
@@ -294,7 +272,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,
p += 4;
*(uint16_t *)p = htons(cnt);
p += 2;
- *(uint32_t *)p = htonl(uniq_id);
+ *(uint32_t *)p = htonl(uniq_id_);
out_len = total_len + 4;
} else {
@@ -319,10 +297,10 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs,
char *pack_data,
// attr
std::string attr;
attr = "groupId=" + msgs[0]->inlong_group_id_ +
- "&streamId=" + msgs[0]->inlong_stream_id_;
+ "&streamId=" + msgs[0]->inlong_stream_id_;
attr += "&dt=" + std::to_string(data_time_);
- attr += "&mid=" + std::to_string(uniq_id);
+ attr += "&mid=" + std::to_string(uniq_id_);
if (isSnappy)
attr += "&cp=snappy";
attr += "&cnt=" + std::to_string(cnt);
@@ -352,56 +330,97 @@ bool RecvGroup::IsZipAndOperate(std::string &res,
uint32_t real_cur_len) {
}
void RecvGroup::DispatchMsg(bool exit) {
- if (cur_len_ <= constants::ATTR_LENGTH || msgs_.empty())
- return;
+ if (cur_len_ <= constants::ATTR_LENGTH) return;
bool len_enough = cur_len_ > SdkConfig::getInstance()->pack_size_;
- bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) >
- SdkConfig::getInstance()->pack_timeout_;
+ bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) >
SdkConfig::getInstance()->pack_timeout_;
if (len_enough || time_enough) {
DoDispatchMsg();
}
}
-std::shared_ptr<SendBuffer>
-RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
+std::shared_ptr<SendBuffer> RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr>
&msgs) {
if (msgs.empty()) {
- LOG_ERROR("pack msgs is empty.");
return nullptr;
}
- std::shared_ptr<SendBuffer> send_buffer =
- std::make_shared<SendBuffer>(data_capacity_);
+ std::shared_ptr<SendBuffer> send_buffer =
BufferManager::GetInstance()->GetSendBuffer();
if (send_buffer == nullptr) {
- LOG_ERROR("make send buffer failed.");
return nullptr;
}
+
uint32_t len = 0;
- int32_t msg_cnt = msgs.size();
- uint32_t uniq_id = g_send_msgid.incrementAndGet();
+ uint32_t msg_cnt = msgs.size();
+ if (++uniq_id_ >= constants::kMaxSnowFlake) {
+ uniq_id_ = 0;
+ }
- if (!PackMsg(msgs, send_buffer->content(), len, uniq_id) || len == 0) {
- LOG_ERROR("failed to write data to send buf from pack queue, sendQueue "
- "id:%d, buf id:%d");
+ if (!PackMsg(msgs, send_buffer->content(), len) || len == 0) {
+ LOG_ERROR("failed to write data to send buf from pack queue, sendQueue");
return nullptr;
}
+
send_buffer->setLen(len);
send_buffer->setMsgCnt(msg_cnt);
send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_);
send_buffer->setStreamId(msgs[0]->inlong_stream_id_);
- send_buffer->setUniqId(uniq_id);
- send_buffer->setIsPacked(true);
- for (auto it : msgs) {
- send_buffer->addUserMsg(it);
+ for (const auto &it : msgs) {
+ if(it->cb_){
+ send_buffer->addUserMsg(it);
+ }
}
return send_buffer;
}
-void RecvGroup::CallbalkToUsr(std::vector<SdkMsgPtr> &msgs) {
- for (auto &it : msgs) {
- if (it->cb_) {
- it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
- it->msg_.data(), it->msg_.size(), it->user_report_time_,
- it->user_client_ip_.data());
+uint32_t RecvGroup::ParseMsg(std::vector<SdkMsgPtr> &msg_vec) {
+ if (msg_vec.empty()) {
+ return SdkCode::kSuccess;
+ }
+
+ std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(msg_vec);
+
+ if (send_buffer == nullptr) {
+ return SdkCode::kBufferManagerFull;
+ }
+
+ uint32_t ret = send_group_->PushData(send_buffer);
+ if (ret != SdkCode::kSuccess) {
+ fail_queue_.push(send_buffer);
+ }
+ return ret;
+}
+bool RecvGroup::CanDispatch() {
+ if (group_key_.empty()) {
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("Group key is empty!");
+ log_stat_ = 0;
+ }
+ return false;
+ }
+ if (nullptr == send_group_) {
+ send_group_ = send_manager_->GetSendGroup(group_key_);
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("failed to get send group! group_key:" << group_key_);
+ log_stat_ = 0;
+ }
+ return false;
+ }
+ if (!send_group_->IsAvailable()) {
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is
not available!");
+ log_stat_ = 0;
}
+ return false;
}
+ if (send_group_->IsFull()) {
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is
full!");
+ log_stat_ = 0;
+ }
+ return false;
+ }
+ return true;
+}
+void RecvGroup::UpdateCurrentMsgLen(uint64_t msg_size) {
+ std::lock_guard<std::mutex> lck(mutex_);
+ cur_len_ = cur_len_ - msg_size;
}
-} // namespace inlong
\ No newline at end of file
+} // namespace inlong
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index adac9a7f10..839c49e14d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
#ifndef INLONG_SDK_RECV_GROUP_H
@@ -25,17 +27,19 @@
#include <unordered_map>
#include "../config/sdk_conf.h"
+#include "../core/sdk_msg.h"
#include "../manager/send_manager.h"
#include "../utils/atomic.h"
#include "../utils/noncopyable.h"
namespace inlong {
class RecvGroup {
-private:
- char *pack_buf_;
+ private:
+ char* pack_buf_;
std::queue<SdkMsgPtr> msgs_;
+ std::queue<SdkMsgPtr> fail_msgs_;
uint32_t data_capacity_;
- uint32_t cur_len_;
+ uint64_t cur_len_;
AtomicInt pack_err_;
uint64_t data_time_;
uint16_t groupId_num_;
@@ -47,33 +51,36 @@ private:
uint64_t last_pack_time_;
uint64_t max_recv_size_;
+
std::string group_key_;
uint64_t log_stat_;
+ SendGroupPtr send_group_;
+ std::string local_ip_;
+ uint64_t max_msg_size_;
+ uint64_t uniq_id_;
+
+ std::unordered_map<std::string, std::queue<SdkMsgPtr>> recv_queue_;
+ std::unordered_map<std::string, std::queue<SdkMsgPtr>> dispatch_queue_;
+ std::queue<SendBufferPtrT> fail_queue_;
- int32_t DoDispatchMsg();
- void AddMsg(const std::string &msg, std::string client_ip,
- int64_t report_time, UserCallBack call_back,const std::string
&groupId,
- const std::string &streamId);
- bool IsZipAndOperate(std::string &res, uint32_t real_cur_len);
+ void DoDispatchMsg();
+ bool IsZipAndOperate(std::string& res, uint32_t real_cur_len);
inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }
+ uint32_t ParseMsg(std::vector<SdkMsgPtr>& msg_vec);
-public:
- RecvGroup(const std::string &group_key,std::shared_ptr<SendManager>
send_manager);
+ public:
+ RecvGroup(const std::string& group_key, std::shared_ptr<SendManager>
send_manager);
~RecvGroup();
- int32_t SendData(const std::string &msg, const std::string &groupId,
- const std::string &streamId, const std::string &client_ip,
- uint64_t report_time, UserCallBack call_back);
- bool PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, uint32_t
&out_len,
- uint32_t uniq_id);
+ int32_t SendData(const std::string& msg, const std::string&
inlong_group_id_, const std::string& inlong_stream_id_, uint64_t report_time,
+ UserCallBack call_back);
+ bool PackMsg(std::vector<SdkMsgPtr>& msgs, char* pack_data, uint32_t&
out_len);
void DispatchMsg(bool exit);
-
- char *data() const { return pack_buf_; }
-
- std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr> &msgs);
- void CallbalkToUsr(std::vector<SdkMsgPtr> &msgs);
+ std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr>& msgs);
+ bool CanDispatch();
+ void UpdateCurrentMsgLen(uint64_t msg_size);
};
using RecvGroupPtr = std::shared_ptr<RecvGroup>;
-} // namespace inlong
+} // namespace inlong
-#endif // INLONG_SDK_RECV_GROUP_H
\ No newline at end of file
+#endif // INLONG_SDK_RECV_GROUP_H
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 09014d728d..1653d68de0 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -217,8 +217,8 @@ int32_t ProxyManager::GetProxy(const std::string &key,
return GetProxyByClusterId(key, proxy_info_vec);
}
-int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id,
- bool is_inited) {
+int32_t ProxyManager::CheckGroupIdConf(const std::string &inlong_group_id,
+ bool is_inited) {
{
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
auto it = groupid_2_cluster_id_map_.find(inlong_group_id);
@@ -379,7 +379,7 @@ void ProxyManager::WriteLocalCache() {
} catch (...) {
LOG_ERROR("WriteLocalCache error!");
}
- LOG_INFO("WriteLocalCache bid number:" << groupid_count);
+ LOG_INFO("WriteLocalCache getGroupId number:" << groupid_count);
}
std::string ProxyManager::RecoverFromLocalCache(const std::string &groupid) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 3cd3e5491f..419bc60798 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -62,7 +62,7 @@ public:
static ProxyManager instance;
return &instance;
}
- int32_t CheckBidConf(const std::string &inlong_group_id, bool is_inited);
+ int32_t CheckGroupIdConf(const std::string &inlong_group_id, bool is_inited);
void Update();
void DoUpdate();
void Init();
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 616f018b90..47282e0e32 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -22,6 +22,7 @@
#include "string.h"
#include <stdint.h>
+#include <limits>
namespace inlong {
namespace constants {
@@ -79,6 +80,9 @@ static const uint32_t kReserveProxyNum = 2;
static const bool kEnableTCPNagle = true;
static const uint32_t kTcpIdleTime = 600000;
static const uint32_t kTcpDetectionInterval = 60000;
+static const uint32_t kMaxRetryIntervalMs= 3000;
+static const uint32_t kRetryIntervalMs= 200;
+static const int32_t kRetryTimes = 1;
static const char kSerIP[] = "127.0.0.1";
static const uint32_t kSerPort = 46801;
@@ -87,6 +91,7 @@ static const uint32_t kMsgType = 7;
static const bool kEnableSetAffinity = false;
static const uint32_t kMaskCPUAffinity = 0xff;
static const uint16_t kExtendField = 0;
+static const uint64_t kMaxSnowFlake = std::numeric_limits<uint64_t>::max();
// http basic auth
static const char kBasicAuthHeader[] = "Authorization:";
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
index 1f2970b17d..5b7e5a596a 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
@@ -72,8 +72,8 @@ public:
void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; }
uint32_t len() { return len_; }
void setLen(const uint32_t len) { len_ = len; }
- std::string bid() { return inlong_group_id_; }
- std::string tid() { return inlong_stream_id_; }
+ std::string getGroupId() { return inlong_group_id_; }
+ std::string getStreamId() { return inlong_stream_id_; }
void setInlongGroupId(const std::string &inlong_group_id) {
inlong_group_id_ = inlong_group_id;
}