This is an automated email from the ASF dual-hosted git repository.
gfphoenix78 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new 5bb33f7d81b Port UDP motion layer fixes from ic_udpifc.c to udp2
5bb33f7d81b is described below
commit 5bb33f7d81b30fe058414108cae987f947ab5255
Author: zhangyue <[email protected]>
AuthorDate: Wed Oct 22 22:32:58 2025 +0800
Port UDP motion layer fixes from ic_udpifc.c to udp2
This commit integrates critical UDP motion layer improvements from
commits a337ea2 and acec354b into the contrib/udp2 component,
addressing extreme-scenario issues in resource-constrained
environments.
More details see commits a337ea2 and acec354b.
Co-authored-by: zhaoxi <[email protected]>
---
contrib/udp2/ic_common/ic_utility.hpp | 2 +
contrib/udp2/ic_common/udp2/ic_udp2.cpp | 850 +++++++++++++++++++++--
contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp | 114 ++-
contrib/udp2/ic_udp2.c | 39 ++
4 files changed, 931 insertions(+), 74 deletions(-)
diff --git a/contrib/udp2/ic_common/ic_utility.hpp
b/contrib/udp2/ic_common/ic_utility.hpp
index 169c7b3cc18..b8c8919ff12 100644
--- a/contrib/udp2/ic_common/ic_utility.hpp
+++ b/contrib/udp2/ic_common/ic_utility.hpp
@@ -100,6 +100,8 @@ typedef enum GpVars_Interconnect_Method_IC
{
INTERCONNECT_FC_METHOD_CAPACITY_IC = 0,
INTERCONNECT_FC_METHOD_LOSS_IC = 2,
+ INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC = 3,
+ INTERCONNECT_FC_METHOD_LOSS_TIMER_IC = 4,
} GpVars_Interconnect_Method_IC;
typedef enum
diff --git a/contrib/udp2/ic_common/udp2/ic_udp2.cpp
b/contrib/udp2/ic_common/udp2/ic_udp2.cpp
index c09ea4e8434..a747d91f600 100644
--- a/contrib/udp2/ic_common/udp2/ic_udp2.cpp
+++ b/contrib/udp2/ic_common/udp2/ic_udp2.cpp
@@ -72,6 +72,21 @@ extern "C" {
#include "ic_udp2_internal.hpp"
#include "ic_faultinjection.h"
+/*
+ * Hints to the compiler about the likelihood of a branch. Both likely() and
+ * unlikely() return the boolean value of the contained expression.
+ *
+ * These should only be used sparingly, in very hot code paths. It's very easy
+ * to mis-estimate likelihoods.
+ */
+#if __GNUC__ >= 3
+#define likely(x) __builtin_expect((x) != 0, 1)
+#define unlikely(x) __builtin_expect((x) != 0, 0)
+#else
+#define likely(x) ((x) != 0)
+#define unlikely(x) ((x) != 0)
+#endif
+
static int timeoutArray[] =
{
1,
@@ -121,7 +136,7 @@ static ICGlobalControlInfo ic_control_info;
/*
* All connections in a process share this unack queue ring instance.
*/
-static UnackQueueRing unack_queue_ring = {0, 0, 0};
+static UnackQueueRing unack_queue_ring = {0};
static int ICSenderSocket = -1;
static int32 ICSenderPort = 0;
@@ -144,6 +159,8 @@ static std::condition_variable cv;
CChunkTransportState *CChunkTransportStateImpl::state_ = nullptr;
+static struct mudp_manager mudp;
+
/*
* Identity the user of ic module by vector_engine_is_user:
* "false" means PG executor, "true" means Arrow executor.
@@ -237,6 +254,8 @@ static ssize_t sendtoWithRetry(int socket, const void
*message, size_t length, i
static char *format_sockaddr_udp(struct sockaddr_storage *sa, char *buf,
size_t len);
+static void initUdpManager(mudp_manager_t mptr);
+
static char* flags2txt(uint32 pkt_flags);
static const char* flags_text[] =
@@ -923,6 +942,14 @@ initUnackQueueRing(UnackQueueRing *uqr)
{
uqr->slots[i].init(ICBufferListType_Secondary);
}
+
+#ifdef TIMEOUT_Z
+ uqr->retrans_count = 0;
+ uqr->no_retrans_count = 0;
+ uqr->time_difference = 0;
+ uqr->min = 0;
+ uqr->max = 0;
+#endif
}
/*
@@ -1109,6 +1136,344 @@ SendBufferPool::get(UDPConn *conn)
return ret;
}
+static struct rto_hashstore*
+initRTOHashstore()
+{
+ int i;
+ struct rto_hashstore* hs = (struct
rto_hashstore*)ic_malloc(sizeof(struct rto_hashstore));
+
+ for (i = 0; i < RTO_HASH; i++)
+ TAILQ_INIT(&hs->rto_list[i]);
+
+ TAILQ_INIT(&hs->rto_list[RTO_HASH]);
+
+ return hs;
+}
+
+static void
+initUdpManager(mudp_manager_t mudp)
+{
+ mudp->rto_store = initRTOHashstore();
+ mudp->rto_list_cnt = 0;
+ mudp->cur_ts = 0;
+}
+
+static inline void
+addtoRTOList(mudp_manager_t mudp, UDPConn *cur_stream)
+{
+ if (!mudp->rto_list_cnt)
+ {
+ mudp->rto_store->rto_now_idx = 0;
+ mudp->rto_store->rto_now_ts = cur_stream->sndvar.ts_rto;
+ }
+
+ if (cur_stream->on_rto_idx < 0 )
+ {
+ if (cur_stream->on_timewait_list)
+ return;
+
+ int diff = (int32_t)(cur_stream->sndvar.ts_rto -
mudp->rto_store->rto_now_ts);
+ if (diff < RTO_HASH)
+ {
+ int offset= (diff + mudp->rto_store->rto_now_idx) %
RTO_HASH;
+ cur_stream->on_rto_idx = offset;
+ TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]),
+ cur_stream, sndvar.timer_link);
+ }
+ else
+ {
+ cur_stream->on_rto_idx = RTO_HASH;
+
TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[RTO_HASH]),
+ cur_stream, sndvar.timer_link);
+ }
+ mudp->rto_list_cnt++;
+ }
+}
+
+static inline void
+removeFromRTOList(mudp_manager_t mudp,
+ UDPConn *cur_stream)
+{
+ if (cur_stream->on_rto_idx < 0)
+ return;
+
+ TAILQ_REMOVE(&mudp->rto_store->rto_list[cur_stream->on_rto_idx],
+ cur_stream, sndvar.timer_link);
+ cur_stream->on_rto_idx = -1;
+
+ mudp->rto_list_cnt--;
+}
+
+static inline void
+updateRetransmissionTimer(mudp_manager_t mudp,
+ UDPConn *cur_stream,
+ uint32_t cur_ts)
+{
+ cur_stream->sndvar.nrtx = 0;
+
+ /* if in rto list, remove it */
+ if (cur_stream->on_rto_idx >= 0)
+ removeFromRTOList(mudp, cur_stream);
+
+ /* Reset retransmission timeout */
+ if (UDP_SEQ_GT(cur_stream->snd_nxt, cur_stream->sndvar.snd_una))
+ {
+ /* there are packets sent but not acked */
+ /* update rto timestamp */
+ cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto;
+ addtoRTOList(mudp, cur_stream);
+ }
+
+ if (cur_stream->on_rto_idx == -1)
+ {
+ cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto;
+ addtoRTOList(mudp, cur_stream);
+ }
+}
+
+static int
+handleRTO(mudp_manager_t mudp,
+ uint32_t cur_ts,
+ UDPConn *cur_stream,
+ ICChunkTransportState *transportStates,
+ TransportEntry *pEntry,
+ UDPConn *triggerConn)
+{
+ /* check for expiration */
+ int count = 0;
+ int retransmits = 0;
+ UDPConn *currBuffConn = NULL;
+ uint32_t now = cur_ts;
+
+ Assert(unack_queue_ring.currentTime != 0);
+ removeFromRTOList(mudp, cur_stream);
+
+ while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ <
UNACK_QUEUE_RING_SLOTS_NUM)
+ {
+ /* expired, need to resend them */
+ ICBuffer *curBuf = NULL;
+
+ while ((curBuf =
unack_queue_ring.slots[unack_queue_ring.idx].pop()) != NULL)
+ {
+ curBuf->nRetry++;
+ currBuffConn = static_cast<UDPConn*>(curBuf->conn);
+ putIntoUnackQueueRing(
+
&unack_queue_ring,
+ curBuf,
+
currBuffConn->computeExpirationPeriod(curBuf->nRetry), now);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+ trans_proto_stats.update(TPE_DATA_PKT_SEND,
curBuf->pkt);
+#endif
+
+ currBuffConn->sendOnce(curBuf->pkt);
+
+ retransmits++;
+ ic_statistics.retransmits++;
+ currBuffConn->stat_count_resent++;
+ currBuffConn->stat_max_resent =
Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent);
+ UDPConn::checkNetworkTimeout(curBuf, now,
&transportStates->networkTimeoutIsLogged);
+
+#ifdef AMS_VERBOSE_LOGGING
+ LOG(INFO, "RESEND pkt with seq %d (retry %d, rtt "
UINT64_FORMAT ") to route %d",
+ curBuf->pkt->seq, curBuf->nRetry,
curBuf->conn->rtt, curBuf->conn->route);
+ logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
+#endif
+ }
+
+ unack_queue_ring.currentTime += TIMER_SPAN;
+ unack_queue_ring.idx = (unack_queue_ring.idx + 1) %
(UNACK_QUEUE_RING_SLOTS_NUM);
+ }
+ return 0;
+}
+
+static inline void
+rearrangeRTOStore(mudp_manager_t mudp)
+{
+ UDPConn *walk, *next;
+ struct rto_hashstore::rto_head* rto_list =
&mudp->rto_store->rto_list[RTO_HASH];
+ int cnt = 0;
+
+ for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next)
+ {
+ next = TAILQ_NEXT(walk, sndvar.timer_link);
+
+ int diff = (int32_t)(mudp->rto_store->rto_now_ts -
walk->sndvar.ts_rto);
+ if (diff < RTO_HASH)
+ {
+ int offset = (diff + mudp->rto_store->rto_now_idx) %
RTO_HASH;
+ TAILQ_REMOVE(&mudp->rto_store->rto_list[RTO_HASH],
+ walk, sndvar.timer_link);
+ walk->on_rto_idx = offset;
+ TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]),
+ walk, sndvar.timer_link);
+ }
+ cnt++;
+ }
+}
+
+static inline void
+checkRtmTimeout(mudp_manager_t mudp,
+ uint32_t cur_ts,
+ int thresh,
+ ICChunkTransportState *transportStates,
+ TransportEntry *pEntry,
+ UDPConn *triggerConn)
+{
+ UDPConn *walk, *next;
+ struct rto_hashstore::rto_head* rto_list;
+ int cnt;
+
+ if (!mudp->rto_list_cnt)
+ return;
+
+ cnt = 0;
+
+ while (1)
+ {
+ rto_list =
&mudp->rto_store->rto_list[mudp->rto_store->rto_now_idx];
+ if ((int32_t)(cur_ts - mudp->rto_store->rto_now_ts) < 0)
+ break;
+
+ for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next)
+ {
+ if (++cnt > thresh)
+ break;
+ next = TAILQ_NEXT(walk, sndvar.timer_link);
+
+ if (walk->on_rto_idx >= 0)
+ {
+ TAILQ_REMOVE(rto_list, walk, sndvar.timer_link);
+ mudp->rto_list_cnt--;
+ walk->on_rto_idx = -1;
+ handleRTO(mudp, cur_ts, walk, transportStates,
pEntry, triggerConn);
+ }
+ }
+
+ if (cnt > thresh)
+ {
+ break;
+ }
+ else
+ {
+ mudp->rto_store->rto_now_idx =
(mudp->rto_store->rto_now_idx + 1) % RTO_HASH;
+ mudp->rto_store->rto_now_ts++;
+ if (!(mudp->rto_store->rto_now_idx % 1000))
+ rearrangeRTOStore(mudp);
+ }
+
+ }
+}
+
+/*
+ * estimateRTT - Dynamically estimates the Round-Trip Time (RTT) and adjusts
Retransmission Timeout (RTO)
+ *
+ * This function implements a variant of the Jacobson/Karels algorithm for RTT
estimation, adapted for UDP-based
+ * motion control connections. It updates smoothed RTT (srtt), mean deviation
(mdev), and RTO values based on
+ * newly measured RTT samples (mrtt). The RTO calculation ensures reliable
data transmission over unreliable networks.
+ *
+ * Key Components:
+ * - srtt: Smoothed Round-Trip Time (weighted average of historical RTT
samples)
+ * - mdev: Mean Deviation (measure of RTT variability)
+ * - rttvar: Adaptive RTT variation bound (used to clamp RTO updates)
+ * - rto: Retransmission Timeout (dynamically adjusted based on srtt +
rttvar)
+ *
+ * Algorithm Details:
+ * 1. For the first RTT sample:
+ * srtt = mrtt << 3 (scaled by 8 for fixed-point arithmetic)
+ * mdev = mrtt << 1 (scaled by 2)
+ * rttvar = max(mdev, rto_min)
+ * 2. For subsequent samples:
+ * Delta = mrtt - (srtt >> 3) (difference between new sample and
smoothed RTT)
+ * srtt += Delta (update srtt with 1/8 weight of new
sample)
+ * Delta = abs(Delta) - (mdev >> 2)
+ * mdev += Delta (update mdev with 1/4 weight)
+ * 3. rttvar bounds the maximum RTT variation:
+ * If mdev > mdev_max, update mdev_max and rttvar
+ * On new ACKs (snd_una > rtt_seq), decay rttvar toward mdev_max
+ * 4. Final RTO calculation:
+ * rto = (srtt >> 3) + rttvar (clamped to RTO_MAX)
+ *
+ * Parameters:
+ * @mConn: Parent motion connection context (container of MotionConnUDP)
+ * @mrtt: Measured Round-Trip Time (in microseconds) for the latest packet
+ *
+ * Notes:
+ * - Designed for non-retransmitted packets to avoid sampling bias.
+ * - Uses fixed-point arithmetic to avoid floating-point operations.
+ * - Minimum RTO (rto_min) is set to 20ms (HZ/5/10, assuming HZ=100).
+ * - Critical for adaptive timeout control in UDP protocols where
reliability is implemented at the application layer.
+ * - Thread-unsafe: Must be called in a synchronized context (e.g., packet
processing loop).
+ */
+static inline void
+estimateRTT(UDPConn *conn , uint32_t mrtt)
+{
+ /* This function should be called for not retransmitted packets */
+ /* TODO: determine rto_min */
+
+ long m = mrtt;
+ uint32_t rto_min = UDP_RTO_MIN / 10;
+
+ if (m == 0)
+ m = 1;
+
+ /*
+ * Special RTO optimization for high-speed networks:
+ * When measured RTT (m) is below 100 microseconds and current RTO is
under 10ms,
+ * forcibly set RTO to half of RTO_MIN. This targets two scenarios:
+ * - Loopback interfaces (localhost communication)
+ * - Ultra-low-latency networks (e.g., InfiniBand, RDMA)
+ */
+ if(m < 100 && conn->rttvar.rto < 10000)
+ {
+ conn->rttvar.rto = RTO_MIN / 2;
+ }
+
+ if (conn->rttvar.srtt != 0)
+ {
+ /* rtt = 7/8 rtt + 1/8 new */
+ m -= (conn->rttvar.srtt >> LOSS_THRESH);
+ conn->rttvar.srtt += m;
+ if (m < 0)
+ {
+ m = -m;
+ m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA);
+ if (m > 0)
+ m >>= LOSS_THRESH;
+ }
+ else
+ {
+ m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA);
+ }
+ conn->rttvar.mdev += m;
+ if (conn->rttvar.mdev > conn->rttvar.mdev_max)
+ {
+ conn->rttvar.mdev_max = conn->rttvar.mdev;
+ if (conn->rttvar.mdev_max > conn->rttvar.rttvar)
+ {
+ conn->rttvar.rttvar = conn->rttvar.mdev_max;
+ }
+ }
+ if (UDP_SEQ_GT(conn->rttvar.snd_una, conn->rttvar.rtt_seq))
+ {
+ if (conn->rttvar.mdev_max < conn->rttvar.rttvar)
+ {
+ conn->rttvar.rttvar -= (conn->rttvar.rttvar -
conn->rttvar.mdev_max) >> RTT_SHIFT_ALPHA;
+ }
+ conn->rttvar.mdev_max = rto_min;
+ }
+ }
+ else
+ {
+ /* fresh measurement */
+ conn->rttvar.srtt = m << LOSS_THRESH;
+ conn->rttvar.mdev = m << 1;
+ conn->rttvar.mdev_max = conn->rttvar.rttvar =
MAX(conn->rttvar.mdev, rto_min);
+ }
+
+ conn->rttvar.rto = ((conn->rttvar.srtt >> LOSS_THRESH) +
conn->rttvar.rttvar) > RTO_MAX ? RTO_MAX : ((conn->rttvar.srtt >> LOSS_THRESH)
+ conn->rttvar.rttvar);
+}
+
/*
* addCRC
* add CRC field to the packet.
@@ -1844,6 +2209,19 @@ setupOutgoingUDPConnection(int icid, TransportEntry
*pEntry, UDPConn *conn)
conn->msgSize = sizeof(conn->conn_info);
conn->stillActive = true;
conn->conn_info.seq = 1;
+ conn->rttvar.ts_rto = 0;
+ conn->rttvar.rto = UDP_INITIAL_RTO;
+ conn->rttvar.srtt = 0;
+ conn->rttvar.rttvar = 0;
+ conn->rttvar.snd_una = 0;
+ conn->rttvar.nrtx = 0;
+ conn->rttvar.max_nrtx = 0;
+ conn->rttvar.mss = UDP_DEFAULT_MSS;
+ conn->rttvar.cwnd = 2;
+ conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH;
+ conn->rttvar.loss_count = 0;
+ conn->rttvar.karn_mode = false;
+ conn->on_rto_idx = -1;
Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family ==
AF_INET6);
}
@@ -2324,6 +2702,9 @@ UDPConn::computeExpirationPeriod(uint32 retry)
else
#endif
{
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+ return Min(retry > 3 ? this->rttvar.rto * retry :
this->rttvar.rto, UNACK_QUEUE_RING_LENGTH_LOSS);
+
uint32 factor = (retry <= 12 ? retry : 12);
return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD,
(int)(this->rtt + (this->dev << 2)) << (factor)));
}
@@ -2418,7 +2799,7 @@ UDPConn::DeactiveConn()
* packet is retransmitted.
*/
void
-UDPConn::handleAckedPacket(ICBuffer *buf, uint64 now)
+UDPConn::handleAckedPacket(ICBuffer *buf, uint64 now, struct icpkthdr *pkt)
{
uint64 ackTime = 0;
bool bufIsHead = false;
@@ -2428,6 +2809,39 @@ UDPConn::handleAckedPacket(ICBuffer *buf, uint64 now)
buf = this->unackQueue.remove(buf);
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC ||
session_param.Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER_IC)
+ {
+ bufConn = static_cast<UDPConn*>(buf->conn);
+ buf =
unack_queue_ring.slots[buf->unackQueueRingSlot].remove(buf);
+ unack_queue_ring.numOutStanding--;
+ if (this->unackQueue.length() >= 1)
+ unack_queue_ring.numSharedOutStanding--;
+
+ ackTime = now - buf->sentTime;
+
+ if (buf->nRetry == 0)
+ {
+ /* adjust the congestion control window. */
+ if (snd_control_info.cwnd < snd_control_info.ssthresh)
+ snd_control_info.cwnd += 2;
+ else
+ snd_control_info.cwnd += 1 /
snd_control_info.cwnd;
+ snd_control_info.cwnd = Min(snd_control_info.cwnd,
snd_buffer_pool.maxCount);
+ }
+
+ if ((bufConn->rttvar.rto << 1) > ackTime && pkt->retry_times !=
session_param.Gp_interconnect_min_retries_before_timeout)
+ estimateRTT(bufConn, (now - pkt->send_time));
+
+ if (buf->nRetry && pkt->retry_times > 0 && pkt->retry_times <
session_param.Gp_interconnect_min_retries_before_timeout)
+ bufConn->rttvar.rto += (bufConn->rttvar.rto >> 4 *
buf->nRetry);
+
+ if (unlikely(session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC))
+ {
+ bufConn->sndvar.ts_rto = bufConn->rttvar.rto;
+ addtoRTOList(&mudp, bufConn);
+ }
+ }
+
if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC)
{
ICBufferList *alist =
&unack_queue_ring.slots[buf->unackQueueRingSlot];
@@ -2608,24 +3022,59 @@ getCurrentTime(void)
static void
putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime,
uint64 now)
{
+ UDPConn *buffConn = static_cast<UDPConn*>(buf->conn);
uint64 diff = 0;
int idx = 0;
-
- /* The first packet, currentTime is not initialized */
- if (uqr->currentTime == 0)
- uqr->currentTime = now - (now % TIMER_SPAN);
-
- diff = now + expTime - uqr->currentTime;
- if (diff >= UNACK_QUEUE_RING_LENGTH)
+
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
{
+ /* The first packet, currentTime is not initialized */
+#ifndef TIMEOUT_Z
+ if (uqr->currentTime == 0)
+ uqr->currentTime = now - (now % TIMER_SPAN_LOSS);
+#else
+ if (uqr->currentTime == 0 && buffConn->rttvar.rto == 0)
+ uqr->currentTime = now - (now % TIMER_SPAN_LOSS);
+ else
+ uqr->currentTime = now + buffConn->rttvar.rto;
+
+#endif
+ diff = expTime;
+ if (diff >= UNACK_QUEUE_RING_LENGTH_LOSS)
+ {
#ifdef AMS_VERBOSE_LOGGING
- LOG(INFO, "putIntoUnackQueueRing:" "now %lu expTime %lu diff
%lu uqr-currentTime %lu", now, expTime, diff, uqr->currentTime);
+ LOG(INFO, "putIntoUnackQueueRing:" "now " UINT64_FORMAT
"expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime "
UINT64_FORMAT, now, expTime, diff, uqr->currentTime);
#endif
- diff = UNACK_QUEUE_RING_LENGTH - 1;
+ diff = UNACK_QUEUE_RING_LENGTH_LOSS - 1;
+ }
+ else if (diff < TIMER_SPAN_LOSS)
+ {
+ diff = diff < TIMER_SPAN_LOSS ? TIMER_SPAN_LOSS : diff;
+ }
}
- else if (diff < TIMER_SPAN)
+ else
{
- diff = TIMER_SPAN;
+ if (uqr->currentTime == 0)
+ uqr->currentTime = now - (now % TIMER_SPAN_LOSS);
+
+ diff = now + expTime - uqr->currentTime;
+ if (diff >= UNACK_QUEUE_RING_LENGTH)
+ {
+#ifdef AMS_VERBOSE_LOGGING
+ LOG(INFO, "putIntoUnackQueueRing:" "now " UINT64_FORMAT
"expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime "
UINT64_FORMAT, now, expTime, diff, uqr->currentTime);
+#endif
+ diff = UNACK_QUEUE_RING_LENGTH - 1;
+ }
+ else if (diff < TIMER_SPAN)
+ {
+ diff = TIMER_SPAN;
+ }
+
+ idx = (uqr->idx + diff / TIMER_SPAN) %
UNACK_QUEUE_RING_SLOTS_NUM;
+
+#ifdef AMS_VERBOSE_LOGGING
+ LOG(INFO, "PUTTW: curtime " UINT64_FORMAT " now " UINT64_FORMAT
" (diff " UINT64_FORMAT ") expTime " UINT64_FORMAT " previdx %d, nowidx %d,
nextidx %d", uqr->currentTime, now, diff, expTime, buf->unackQueueRingSlot,
uqr->idx, idx);
+#endif
}
idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM;
@@ -2784,6 +3233,30 @@ handleDataPacket(UDPConn *conn, icpkthdr *pkt, struct
sockaddr_storage *peer, so
logPkt("Interconnect error: received a packet when the queue is
full ", pkt);
ic_statistics.disorderedPktNum++;
conn->stat_count_dropped++;
+
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC &&
rx_control_info.mainWaitingState.waiting &&
+ rx_control_info.mainWaitingState.waitingNode ==
pkt->motNodeId &&
+ rx_control_info.mainWaitingState.waitingQuery ==
pkt->icId)
+ {
+ if (rx_control_info.mainWaitingState.waitingRoute ==
ANY_ROUTE)
+ {
+ if (rx_control_info.mainWaitingState.reachRoute
== ANY_ROUTE)
+
rx_control_info.mainWaitingState.reachRoute = conn->route;
+ }
+ else if (rx_control_info.mainWaitingState.waitingRoute
== conn->route)
+ {
+ if (IC_DEBUG2 >= session_param.log_min_messages)
+ LOG(INFO, "rx thread: main_waiting
waking it route %d", rx_control_info.mainWaitingState.waitingRoute);
+ rx_control_info.mainWaitingState.reachRoute =
conn->route;
+ }
+ /* WAKE MAIN THREAD HERE */
+ *wakeup_mainthread = true;
+ }
+
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+ {
+ conn->setAckParam(param, UDPIC_FLAGS_FULL,
conn->conn_info.seq - 1, conn->conn_info.extraSeq);
+ }
return false;
}
@@ -3060,9 +3533,23 @@ rxThreadFunc(void *arg)
UDPConn *conn = ic_control_info.connHtab.find(pkt);
if (conn != NULL)
{
+ uint64 now = getCurrentTime();
+ uint64 send_time = pkt->send_time;
+ uint64 recv_time = now;
+ uint64 retry_times = pkt->retry_times;
+
+ bool drop_ack = pkt->seq < conn->conn_info.seq
? true : false;
/* Handling a regular packet */
if (handleDataPacket(conn, pkt, &peer,
&peerlen, ¶m, &wakeup_mainthread))
pkt = NULL;
+ if (!pkt)
+ {
+ param.msg.send_time = send_time;
+ param.msg.recv_time = recv_time;
+ param.msg.retry_times = retry_times;
+ }
+ if (drop_ack)
+ param.msg.retry_times =
session_param.Gp_interconnect_min_retries_before_timeout;
ic_statistics.recvPktNum++;
}
else
@@ -3835,7 +4322,7 @@ UDPConn::sendBuffers()
{
ICBuffer *buf = NULL;
- if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC)
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
{
if (this->unackQueue.length() > 0 &&
unack_queue_ring.numSharedOutStanding >=
(snd_control_info.cwnd - snd_control_info.minCwnd))
@@ -3858,7 +4345,7 @@ UDPConn::sendBuffers()
this->unackQueue.append(buf);
- if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC)
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
{
unack_queue_ring.numOutStanding++;
if (this->unackQueue.length() > 1)
@@ -3882,6 +4369,10 @@ UDPConn::sendBuffers()
trans_proto_stats.update(TPE_DATA_PKT_SEND, buf->pkt);
#endif
+ struct icpkthdr *pkt_ = buf->pkt;
+ pkt_->send_time = now;
+ pkt_->recv_time = 0;
+ pkt_->retry_times = buf->nRetry;
this->sendOnce(buf->pkt);
ic_statistics.sndPktNum++;
@@ -4022,7 +4513,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
if (buf->pkt->seq == pkt->seq)
{
- this->handleAckedPacket(buf, now);
+ this->handleAckedPacket(buf, now, pkt);
shouldSendBuffers = true;
break;
}
@@ -4032,7 +4523,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
/* this is a lost packet, retransmit */
buf->nRetry++;
- if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC)
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
{
ICBufferList *alist =
&unack_queue_ring.slots[buf->unackQueueRingSlot];
buf = alist->remove(buf);
@@ -4063,7 +4554,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
/* remove packet already received. */
next = link->next;
- this->handleAckedPacket(buf, now);
+ this->handleAckedPacket(buf, now, pkt);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
@@ -4080,7 +4571,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
lostPktCnt--;
}
}
- if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC)
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
{
snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2,
snd_control_info.minCwnd);
snd_control_info.cwnd = snd_control_info.ssthresh;
@@ -4129,7 +4620,7 @@ UDPConn::handleAckForDuplicatePkt(icpkthdr *pkt)
while (!this->unackQueue.is_head(link) && (buf->pkt->seq <=
pkt->extraSeq))
{
next = link->next;
- this->handleAckedPacket(buf, now);
+ this->handleAckedPacket(buf, now, pkt);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
@@ -4141,7 +4632,7 @@ UDPConn::handleAckForDuplicatePkt(icpkthdr *pkt)
next = link->next;
if (buf->pkt->seq == pkt->seq)
{
- this->handleAckedPacket(buf, now);
+ this->handleAckedPacket(buf, now, pkt);
shouldSendBuffers = true;
break;
}
@@ -4215,59 +4706,225 @@ void
UDPConn::checkExpiration(ICChunkTransportState *transportStates, uint64 now)
{
/* check for expiration */
- int count = 0;
- int retransmits = 0;
+ int count = 0;
+ int retransmits = 0;
UDPConn *currBuffConn = NULL;
Assert(unack_queue_ring.currentTime != 0);
- while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ <
UNACK_QUEUE_RING_SLOTS_NUM)
+
+ if (unlikely(session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC))
{
- /* expired, need to resend them */
- ICBuffer *curBuf = NULL;
+ checkRtmTimeout(&mudp, now, 500, transportStates, this->entry_,
this);
+ return;
+ }
+
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+ {
+ uint64 timer_span_time = unack_queue_ring.currentTime +
TIMER_SPAN_LOSS;
- while (true)
+ while (now >= (timer_span_time +
unack_queue_ring.time_difference) && count++ < UNACK_QUEUE_RING_SLOTS_NUM)
{
- ICBufferList *alist =
&unack_queue_ring.slots[unack_queue_ring.idx];
- curBuf = alist->pop();
- if (curBuf == NULL)
- break;
- UDPConn *conn = static_cast<UDPConn*>(curBuf->conn);
- curBuf->nRetry++;
- putIntoUnackQueueRing(&unack_queue_ring,
- curBuf,
-
conn->computeExpirationPeriod(curBuf->nRetry), now);
+ /* expired, need to resend them */
+ ICBuffer *curBuf = NULL;
+
+ while ((curBuf =
unack_queue_ring.slots[unack_queue_ring.idx].pop()) != NULL)
+ {
+ UDPConn *conn =
static_cast<UDPConn*>(curBuf->conn);
+ curBuf->nRetry++;
+
+ /*
+ * Fixed Timeout Thresholds: Traditional
TCP-style Retransmission Timeout
+ * (RTTVAR.RTO) calculations may be too rigid
for networks with volatile
+ * latency. This leads to:
+ * Premature Retransmissions: Unnecessary
data resends during temporary
+ * latency spikes, wasting bandwidth.
+ * Delayed Recovery: Slow reaction to actual
packet loss when RTO is
+ * overly conservative.
+ *
+ * Lack of Context Awareness: Static RTO
ignores real-time network behavior
+ * patterns, reducing throughput and
responsiveness.
+ *
+ * Solution: Dynamic Timeout Threshold
Adjustment
+ * Implements an adaptive timeout mechanism to
optimize retransmission:
+ * if (now < (curBuf->sentTime +
conn->rttvar.rto)) {
+ * uint32_t diff = (curBuf->sentTime +
conn->rttvar.rto) - now;
+ * // ... (statistical tracking and
threshold adjustment)
+ * }
+ * Temporary Latency Spike: Uses max
(conservative) to avoid false
+ * retransmits, reducing bandwidth waste
(vs. traditional mistaken
+ * retransmissions).
+ * Persistent Packet Loss: Prioritizes min
(aggressive) via
+ * weight_retrans, accelerating recovery
(vs. slow fixed-RTO reaction).
+ * Stable Network: Balances weights for
equilibrium throughput (vs.
+ * static RTO limitations).
+ */
+ if (now < (curBuf->sentTime +
conn->rttvar.rto))
+ {
+#ifdef TIMEOUT_Z
+ uint32_t diff = (curBuf->sentTime +
conn->rttvar.rto) - now;
+ if(unack_queue_ring.retrans_count == 0
&& unack_queue_ring.no_retrans_count == 0)
+ {
+ unack_queue_ring.min = diff;
+ unack_queue_ring.max = diff;
+ }
+
+ if (diff < unack_queue_ring.min)
unack_queue_ring.min = diff;
+ if (diff > unack_queue_ring.max)
unack_queue_ring.max = diff;
+
+ if (unack_queue_ring.retrans_count == 0)
+
unack_queue_ring.time_difference = unack_queue_ring.max;
+ else if
(unack_queue_ring.no_retrans_count == 0 && ic_statistics.retransmits <
(session_param.Gp_interconnect_min_retries_before_timeout / 4))
+
unack_queue_ring.time_difference = 0;
+ else
+ {
+ uint32_t total_count =
unack_queue_ring.retrans_count + unack_queue_ring.no_retrans_count;
+ double weight_retrans =
(double)unack_queue_ring.retrans_count / total_count;
+ double weight_no_retrans =
(double)unack_queue_ring.no_retrans_count / total_count;
+
unack_queue_ring.time_difference = (uint32_t)(unack_queue_ring.max *
weight_no_retrans + unack_queue_ring.min * weight_retrans);
+ }
+
+ ++unack_queue_ring.no_retrans_count;
+ }
+ else
+ ++unack_queue_ring.retrans_count;
+#endif
#ifdef TRANSFER_PROTOCOL_STATS
- trans_proto_stats.update(TPE_DATA_PKT_SEND,
curBuf->pkt);
+ trans_proto_stats.update(TPE_DATA_PKT_SEND,
curBuf->pkt);
#endif
- conn->sendOnce(curBuf->pkt);
+ currBuffConn =
static_cast<UDPConn*>(curBuf->conn);
+ putIntoUnackQueueRing(&unack_queue_ring,
+
curBuf,
+
currBuffConn->computeExpirationPeriod(curBuf->nRetry), getCurrentTime());
+ struct icpkthdr *pkt_ = curBuf->pkt;
- currBuffConn = conn;
+ pkt_->send_time = getCurrentTime();
+ pkt_->recv_time = 0;
+ pkt_->retry_times = curBuf->nRetry;
- retransmits++;
- ic_statistics.retransmits++;
- currBuffConn->stat_count_resent++;
- currBuffConn->stat_max_resent =
Max(currBuffConn->stat_max_resent,
-
currBuffConn->stat_count_resent);
+ currBuffConn->sendOnce(curBuf->pkt);
- UDPConn::checkNetworkTimeout(curBuf, now,
&transportStates->networkTimeoutIsLogged);
+ /*
+ * Adaptive Retry Backoff with Polling for
Network Asymmetry Mitigation
+ *
+ * This logic addresses two critical network
pathologies:
+ * 1. RTO Distortion Amplification:
+ * - Packet loss in volatile networks
causes RTO-based retransmission errors
+ * - Multiple spurious retries increase
network load and congestion collapse risk
+ * 2. Data Skew-Induced Starvation:
+ * - Under unbalanced workloads,
low-traffic nodes experience MON (Message Order Number) delays
+ * - Delayed ACKs trigger false
retransmissions even when packets arrive eventually
+ * - Unacked queue inflation worsens
congestion in high-traffic nodes
+ */
+ int32_t loop_ack = curBuf->nRetry;
+ uint32_t rto_min = UDP_RTO_MIN / 10;
+ uint32_t rtoMs = conn->rttvar.rto / 1000;
+ int32_t wait_time = rto_min > rtoMs ? rto_min :
rtoMs;
+ int32_t loop = 0;
+
+ /*
+ * To optimize performance, we need to process
all the time-out file descriptors (fds)
+ * in each batch together.
+ */
+ if (loop_ack > 0)
+ {
+ while (loop++ < loop_ack)
+ {
+ if
(this->entry_->pollAcks(wait_time))
+ {
+
this->entry_->handleAcks(false);
+ curBuf->nRetry = 0;
+ break;
+ }
+
+ struct icpkthdr *pkt_ =
curBuf->pkt;
+ pkt_->send_time =
getCurrentTime();
+ pkt_->recv_time = 0;
+ pkt_->retry_times =
curBuf->nRetry;
+ currBuffConn->sendOnce(pkt_);
+
+ if (loop_ack <
(session_param.Gp_interconnect_min_retries_before_timeout / 10))
+ wait_time += wait_time
/ 10;
+ else if (loop_ack >
(session_param.Gp_interconnect_min_retries_before_timeout / 10) && loop_ack <
(session_param.Gp_interconnect_min_retries_before_timeout / 5))
+ wait_time += RTO_MAX /
10;
+ else if (loop_ack >
(session_param.Gp_interconnect_min_retries_before_timeout / 5) && loop_ack <
(session_param.Gp_interconnect_min_retries_before_timeout / 2))
+ wait_time += RTO_MAX /
5;
+ else if (loop_ack <
(session_param.Gp_interconnect_min_retries_before_timeout))
+ wait_time += RTO_MAX;
+ };
+ }
+
+ if (loop_ack >
session_param.Gp_interconnect_min_retries_before_timeout / 5)
+ LOG(INFO, "Resending packet (seq %d) to
%s (pid %d cid %d) with %d retries in %lu seconds",
+ curBuf->pkt->seq,
curBuf->conn->remoteHostAndPort,
+ curBuf->pkt->dstPid,
curBuf->pkt->dstContentId, curBuf->nRetry,
+ (now -
curBuf->sentTime) / 1000 / 1000);
+
+ retransmits++;
+ ic_statistics.retransmits++;
+ currBuffConn->stat_count_resent++;
+ currBuffConn->stat_max_resent =
Max(currBuffConn->stat_max_resent,
+
currBuffConn->stat_count_resent);
+
+ UDPConn::checkNetworkTimeout(curBuf, now,
&transportStates->networkTimeoutIsLogged);
#ifdef AMS_VERBOSE_LOGGING
- LOG(INFO, "RESEND pkt with seq %d (retry %d, rtt %lu)
to route %d",
- curBuf->pkt->seq, curBuf->nRetry,
currBuffConn->rtt, currBuffConn->route);
- logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
+ LOG(INFO, "RESEND pkt with seq %d (retry %d,
rtt " UINT64_FORMAT ") to route %d",
+ curBuf->pkt->seq,
curBuf->nRetry, currBuffConn->rtt, currBuffConn->route);
+ logPkt("RESEND PKT in checkExpiration",
curBuf->pkt);
#endif
+ }
+
+ timer_span_time += TIMER_SPAN_LOSS;
+ unack_queue_ring.idx = (unack_queue_ring.idx + 1) %
(UNACK_QUEUE_RING_SLOTS_NUM);
}
+ }
+ else
+ {
+ while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) &&
count++ < UNACK_QUEUE_RING_SLOTS_NUM)
+ {
+ /* expired, need to resend them */
+ ICBuffer *curBuf = NULL;
- unack_queue_ring.currentTime += TIMER_SPAN;
- unack_queue_ring.idx = (unack_queue_ring.idx + 1) %
(UNACK_QUEUE_RING_SLOTS_NUM);
+ while ((curBuf =
unack_queue_ring.slots[unack_queue_ring.idx].pop()) != NULL)
+ {
+ curBuf->nRetry++;
+ currBuffConn =
static_cast<UDPConn*>(curBuf->conn);
+ putIntoUnackQueueRing(
+
&unack_queue_ring,
+ curBuf,
+
currBuffConn->computeExpirationPeriod(curBuf->nRetry), now);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+ trans_proto_stats.update(TPE_DATA_PKT_SEND,
curBuf->pkt);
+#endif
+
+ currBuffConn->sendOnce(curBuf->pkt);
+
+ retransmits++;
+ ic_statistics.retransmits++;
+ currBuffConn->stat_count_resent++;
+ currBuffConn->stat_max_resent =
Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent);
+ UDPConn::checkNetworkTimeout(curBuf, now,
&transportStates->networkTimeoutIsLogged);
+
+#ifdef AMS_VERBOSE_LOGGING
+ LOG(INFO, "RESEND pkt with seq %d (retry %d,
rtt " UINT64_FORMAT ") to route %d",
+ curBuf->pkt->seq, curBuf->nRetry,
curBuf->conn->rtt, curBuf->conn->route);
+ logPkt("RESEND PKT in checkExpiration",
curBuf->pkt);
+#endif
+ }
+
+ unack_queue_ring.currentTime += TIMER_SPAN;
+ unack_queue_ring.idx = (unack_queue_ring.idx + 1) %
(UNACK_QUEUE_RING_SLOTS_NUM);
+ }
+
+ /*
+ * deal with case when there is a long time this function is
not called.
+ */
+ unack_queue_ring.currentTime = now - (now % (TIMER_SPAN));
}
- /*
- * deal with case when there is a long time this function is not called.
- */
- unack_queue_ring.currentTime = now - (now % TIMER_SPAN);
if (retransmits > 0)
{
snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2,
snd_control_info.minCwnd);
@@ -4325,14 +4982,29 @@ UDPConn::checkDeadlock()
ic_control_info.lastDeadlockCheckTime = now;
ic_statistics.statusQueryMsgNum++;
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC && this->entry_->pollAcks(50))
+ {
+ this->entry_->handleAcks(false);
+ this->deadlockCheckBeginTime = now;
+ }
+
/* check network error. */
- if ((now - this->deadlockCheckBeginTime) > ((uint64)
session_param.Gp_interconnect_transmit_timeout * 1000 * 1000))
+ if ((now - this->deadlockCheckBeginTime) > ((uint64)
session_param.Gp_interconnect_transmit_timeout * 100 * 1000))
{
- std::stringstream ss;
- ss <<"ERROR, interconnect encountered a network
error, please check your network"<<
- "Did not get any response from
"<<remoteHostAndPort<<" (pid "<<conn_info.dstPid<<
- " cid "<<conn_info.dstContentId<<") in
"<<session_param.Gp_interconnect_transmit_timeout<<" seconds.",
- throw ICNetworkException(ss.str(), __FILE__,
__LINE__);
+ LOG(INFO, "Did not get any response from %s
(pid %d cid %d) in 600 seconds.", this->remoteHostAndPort,
+ this->conn_info.dstPid,
this->conn_info.dstContentId);
+
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC)
+ this->capacity += 1;
+
+ if ((now - this->deadlockCheckBeginTime) >
((uint64)session_param.Gp_interconnect_transmit_timeout * 1000 * 1000))
+ {
+ std::stringstream ss;
+ ss << "ERROR, interconnect encountered
a network error, please check your network."
+ << "Did not get any response from "
<< remoteHostAndPort << " (pid " << conn_info.dstPid << " cid " <<
conn_info.dstContentId << ") in "
+ <<
session_param.Gp_interconnect_transmit_timeout << " seconds.";
+ throw ICNetworkException(ss.str(),
__FILE__, __LINE__);
+ }
}
}
}
@@ -4375,7 +5047,7 @@ UDPConn::checkExpirationCapacityFC(int timeout)
ic_control_info.lastPacketSendTime = now;
this->updateRetransmitStatistics();
- checkNetworkTimeout(buf, now,
&entry_->state->networkTimeoutIsLogged);
+ UDPConn::checkNetworkTimeout(buf, now,
&entry_->state->networkTimeoutIsLogged);
}
}
@@ -4394,13 +5066,13 @@ UDPConn::checkExceptions(int retry, int timeout)
this->checkExpirationCapacityFC(timeout);
}
- if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC)
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
{
uint64 now = getCurrentTime();
if (now - ic_control_info.lastExpirationCheckTime >
uint64(TIMER_CHECKING_PERIOD))
{
- UDPConn::checkExpiration(this->entry_->state, now);
+ this->checkExpiration(this->entry_->state, now);
ic_control_info.lastExpirationCheckTime = now;
}
}
@@ -4434,12 +5106,23 @@ UDPConn::checkExceptions(int retry, int timeout)
int
UDPConn::computeTimeout(int retry)
{
+ int32_t rtoMs = 0;
+
+ rtoMs = this->rttvar.rto / 1000;
if (this->unackQueue.length() == 0)
return TIMER_CHECKING_PERIOD;
ICBufferLink *bufLink = this->unackQueue.first();
ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+ {
+ if (buf->nRetry == 0 && retry == 0 &&
unack_queue_ring.numSharedOutStanding < (snd_control_info.cwnd -
snd_control_info.minCwnd))
+ return 0;
+
+ return rtoMs > TIMER_CHECKING_PERIOD ? rtoMs:
TIMER_CHECKING_PERIOD;
+ }
+
if (buf->nRetry == 0 && retry == 0)
return 0;
@@ -4513,8 +5196,7 @@ UDPConn::Send(DataBlock *data)
if (this->entry_->pollAcks(timeout))
{
- bool rs = this->entry_->handleAcks();
- if (rs)
+ if (this->entry_->handleAcks(true))
{
/*
* We make sure that we deal with the stop
messages only after
@@ -4527,6 +5209,9 @@ UDPConn::Send(DataBlock *data)
}
this->checkExceptions(retry++, timeout);
doCheckExpiration = false;
+
+ if (!doCheckExpiration && this->unackQueue.length() == 0 &&
this->capacity > 0 && this->sndQueue.length() > 0)
+ this->sendBuffers();
}
this->pBuff = (uint8 *) this->curBuff->pkt;
@@ -4670,7 +5355,7 @@ TransportEntry::aggregateStatistics()
* if we receive a stop message, return true (caller will clean up).
*/
bool
-TransportEntry::handleAcks()
+TransportEntry::handleAcks(bool need_flush)
{
bool ret = false;
UDPConn *ackConn = NULL;
@@ -4800,6 +5485,12 @@ TransportEntry::handleAcks()
shouldSendBuffers |=
(ackConn->handleAckForDisorderPkt(pkt));
break;
}
+ else if (pkt->flags & UDPIC_FLAGS_FULL)
+ {
+ if (IC_DEBUG1 >=
session_param.log_min_messages)
+ LOG(DEBUG1, "Recv buff is full
[seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d",
pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags,
ackConn->conn_info.seq);
+ break;
+ }
/*
* don't get out of the loop if pkt->seq equals
to
@@ -4853,7 +5544,7 @@ TransportEntry::handleAcks()
while
(!ackConn->unackQueue.is_head(link) && buf->pkt->seq <= pkt->seq)
{
next = link->next;
- ackConn->handleAckedPacket(buf,
now);
+ ackConn->handleAckedPacket(buf,
now, pkt);
shouldSendBuffers = true;
link = next;
buf =
GET_ICBUFFER_FROM_PRIMARY(link);
@@ -4869,7 +5560,7 @@ TransportEntry::handleAcks()
* still send here, since in STOP/EOS race case, we may
have been
* in EOS sending logic and will not check stop message.
*/
- if (shouldSendBuffers)
+ if (shouldSendBuffers && need_flush)
ackConn->sendBuffers();
}
else
@@ -4926,7 +5617,7 @@ TransportEntry::handleStopMsgs()
{
if (this->pollAcks(0))
{
- bool rs = this->handleAcks();
+ bool rs = this->handleAcks(true);
if (rs)
{
/* more stops found, loop again. */
@@ -5050,6 +5741,19 @@ TransportEntry::MakeRecvEntry(CChunkTransportStateImpl
*state,
conn->conn_info.icId = icid;
conn->conn_info.flags =
UDPIC_FLAGS_RECEIVER_TO_SENDER;
+ conn->rttvar.ts_rto = 0;
+ conn->rttvar.rto = UDP_INITIAL_RTO;
+ conn->rttvar.srtt = 0;
+ conn->rttvar.rttvar = 0;
+ conn->rttvar.snd_una = 0;
+ conn->rttvar.nrtx = 0;
+ conn->rttvar.max_nrtx = 0;
+ conn->rttvar.mss = UDP_DEFAULT_MSS;
+ conn->rttvar.cwnd = 2;
+ conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH;
+ conn->rttvar.loss_count = 0;
+ conn->rttvar.karn_mode = false;
+ conn->on_rto_idx = -1;
ic_control_info.connHtab.add(conn);
if (global_param.createOpaqueDataCallback)
@@ -5728,6 +6432,8 @@ CChunkTransportStateImpl::CreateSendEntries(ICSliceTable
*sliceTable)
snd_buffer_pool.init();
initUnackQueueRing(&unack_queue_ring);
+ if (session_param.Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC)
+ initUdpManager(&mudp);
ic_control_info.isSender = true;
ic_control_info.lastExpirationCheckTime = getCurrentTime();
ic_control_info.lastPacketSendTime =
ic_control_info.lastExpirationCheckTime;
@@ -6184,12 +6890,16 @@ CChunkTransportStateImpl::SendEOS(int motNodeID,
DataBlock *data)
timeout = conn->computeTimeout(retry);
if (pEntry->pollAcks(timeout))
- pEntry->handleAcks();
+ pEntry->handleAcks(true);
conn->checkExceptions(retry++, timeout);
if (retry >= MAX_TRY)
+ {
+ if (conn->unackQueue.length()
== 0)
+ conn->sendBuffers();
break;
+ }
}
if ((!conn->cdbProc) ||
(conn->unackQueue.length() == 0 &&
diff --git a/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp
b/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp
index baeb6a0841c..2602133a9e5 100644
--- a/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp
+++ b/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp
@@ -50,9 +50,12 @@
#include <signal.h>
#include <stdbool.h>
#include <stddef.h>
+#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
+#include <sys/param.h>
+#include <sys/queue.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
@@ -230,6 +233,9 @@ typedef struct icpkthdr
*/
uint32 seq;
uint32 extraSeq;
+ uint64_t send_time;
+ uint64_t recv_time;
+ uint8_t retry_times;
} icpkthdr;
typedef struct ICBuffer ICBuffer;
@@ -434,6 +440,50 @@ ic_bswap32(uint32 x)
((x >> 24) & 0x000000ff);
}
+#define TIMEOUT_Z
+#define RTT_SHIFT_ALPHA (3) /* srtt (0.125) */
+#define LOSS_THRESH (3) /* Packet loss triggers Karn */
+#define RTO_MIN (5000) /* MIN RTO(ms) */
+#define RTO_MAX (100000) /* MAX RTO(ms) */
+#define UDP_INFINITE_SSTHRESH 0x7fffffff
+
+#define SEC_TO_USEC(t) ((t) * 1000000)
+#define SEC_TO_MSEC(t) ((t) * 1000)
+#define MSEC_TO_USEC(t) ((t) * 1000)
+#define USEC_TO_SEC(t) ((t) / 1000000)
+#define TIME_TICK (1000000/HZ)/* in us */
+
+#define UDP_INITIAL_RTO (MSEC_TO_USEC(200))
+#define UDP_DEFAULT_MSS 1460
+
+#define RTO_HASH (3000)
+
+#define UDP_SEQ_LT(a,b) ((int32_t)((a)-(b)) < 0)
+#define UDP_SEQ_LEQ(a,b) ((int32_t)((a)-(b)) <= 0)
+#define UDP_SEQ_GT(a,b) ((int32_t)((a)-(b)) > 0)
+#define UDP_SEQ_GEQ(a,b) ((int32_t)((a)-(b)) >= 0)
+
+#define UDP_RTO_MIN ((unsigned)(HZ/5))
+
+struct UDPConn;
+struct rto_hashstore
+{
+ uint32_t rto_now_idx; /* pointing the hs_table_s index */
+ uint32_t rto_now_ts;
+
+ TAILQ_HEAD(rto_head, UDPConn) rto_list[RTO_HASH + 1];
+};
+
+struct mudp_manager
+{
+ struct rto_hashstore *rto_store; /* lists related to timeout */
+
+ int rto_list_cnt;
+ uint32_t cur_ts;
+};
+
+typedef struct mudp_manager* mudp_manager_t;
+
#define MAX_TRY (11)
#define TIMEOUT(try) ((try) < MAX_TRY ? (timeoutArray[(try)]) :
(timeoutArray[MAX_TRY]))
@@ -456,6 +506,7 @@ ic_bswap32(uint32 x)
#define UDPIC_FLAGS_DISORDER (32)
#define UDPIC_FLAGS_DUPLICATE (64)
#define UDPIC_FLAGS_CAPACITY (128)
+#define UDPIC_FLAGS_FULL (256)
#define UDPIC_MIN_BUF_SIZE (128 * 1024)
@@ -465,7 +516,6 @@ ic_bswap32(uint32 x)
* A connection hash table bin.
*
*/
-struct UDPConn;
typedef struct ConnHtabBin ConnHtabBin;
struct ConnHtabBin
{
@@ -859,8 +909,10 @@ struct ICGlobalControlInfo
*/
#define UNACK_QUEUE_RING_SLOTS_NUM (2000)
#define TIMER_SPAN (session_param.Gp_interconnect_timer_period * 1000ULL)
/* default: 5ms */
+#define TIMER_SPAN_LOSS (session_param.Gp_interconnect_timer_period * 500ULL)
/* default: 5ms */
#define TIMER_CHECKING_PERIOD
(session_param.Gp_interconnect_timer_checking_period) /* default: 20ms */
#define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN)
+#define UNACK_QUEUE_RING_LENGTH_LOSS (UNACK_QUEUE_RING_SLOTS_NUM *
TIMER_SPAN_LOSS)
#define DEFAULT_RTT (session_param.Gp_interconnect_default_rtt * 1000) /*
default: 20ms */
#define MIN_RTT (100) /* 0.1ms */
@@ -916,6 +968,13 @@ struct UnackQueueRing
/* time slots */
ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM];
+#ifdef TIMEOUT_Z
+ uint32_t retrans_count;
+ uint32_t no_retrans_count;
+ uint32_t time_difference;
+ uint32_t min;
+ uint32_t max;
+#endif
};
/*
@@ -981,6 +1040,27 @@ typedef struct ICStatistics
struct TransportEntry;
+struct udp_send_vars
+{
+ /* send sequence variables */
+ uint32_t snd_una; /* send unacknoledged */
+ uint32_t snd_wnd; /* send window (unscaled) */
+
+ /* retransmission timeout variables */
+ uint8_t nrtx; /* number of retransmission */
+ uint8_t max_nrtx; /* max number of retransmission */
+ uint32_t rto; /* retransmission timeout */
+ uint32_t ts_rto; /* timestamp for retransmission timeout
*/
+
+ /* congestion control variables */
+ uint32_t cwnd; /* congestion window */
+ uint32_t ssthresh; /* slow start threshold */
+
+ TAILQ_ENTRY(UDPConn) send_link;
+ TAILQ_ENTRY(UDPConn) timer_link; /* timer link (rto
list) */
+
+};
+
/*
* Structure used for keeping track of a pt-to-pt connection between two
* Cdb Entities (either QE or QD).
@@ -1035,6 +1115,32 @@ public:
uint64 stat_max_resent;
uint64 stat_count_dropped;
+ struct {
+ uint32_t ts_rto;
+ uint32_t rto;
+ uint32_t srtt;
+ uint32_t rttvar;
+ uint32_t snd_una;
+ uint16_t nrtx;
+ uint16_t max_nrtx;
+ uint32_t mss;
+ uint32_t cwnd;
+ uint32_t ssthresh;
+ uint32_t fss;
+ uint8_t loss_count;
+ uint32_t mdev;
+ uint32_t mdev_max;
+ uint32_t rtt_seq; /* sequence number to
update rttvar */
+ uint32_t ts_all_rto;
+ bool karn_mode;
+ } rttvar;
+
+ uint8_t on_timewait_list;
+ int16_t on_rto_idx;
+
+ uint32_t snd_nxt; /* send next */
+ struct udp_send_vars sndvar;
+
TransportEntry *entry_;
public:
@@ -1054,7 +1160,7 @@ public:
void prepareRxConnForRead();
void DeactiveConn();
- void handleAckedPacket(ICBuffer *buf, uint64 now);
+ void handleAckedPacket(ICBuffer *buf, uint64 now, struct icpkthdr *pkt);
void prepareXmit();
void sendOnce(icpkthdr *pkt);
void handleStop();
@@ -1072,9 +1178,9 @@ public:
void updateRetransmitStatistics();
void checkExpirationCapacityFC(int timeout);
+ void checkExpiration(ICChunkTransportState *transportStates, uint64
now);
static void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool
*networkTimeoutIsLogged);
- static void checkExpiration(ICChunkTransportState *transportStates,
uint64 now);
static void sendAckWithParam(AckSendParam *param);
static void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr
*addr, socklen_t peerLen);
@@ -1125,7 +1231,7 @@ public:
void aggregateStatistics();
- bool handleAcks();
+ bool handleAcks(bool need_flush);
void handleStopMsgs();
bool pollAcks(int timeout);
diff --git a/contrib/udp2/ic_udp2.c b/contrib/udp2/ic_udp2.c
index 31c0d5c2eeb..0972c874941 100644
--- a/contrib/udp2/ic_udp2.c
+++ b/contrib/udp2/ic_udp2.c
@@ -47,6 +47,7 @@
#include "ic_types.h"
#include "udp2/ic_udp2.h"
+#define MAX_QUEUE_SIZE (64)
#define HandleLastError() \
do { \
@@ -159,12 +160,50 @@ WaitInterconnectQuitUDPIFC2(void)
void
SetupInterconnectUDP2(EState *estate)
{
+ int32 sliceNum = 0;
+ int32 calcQueueDepth = 0;
+ int32 calcSndDepth = 0;
+
if (estate->interconnect_context)
elog(ERROR, "SetupInterconnectUDP: already initialized.");
if (!estate->es_sliceTable)
elog(ERROR, "SetupInterconnectUDP: no slice table ?");
+ if (estate != NULL && estate->es_sliceTable != NULL)
+ sliceNum = estate->es_sliceTable->numSlices;
+ else
+ sliceNum = 1;
+
+ if (Gp_interconnect_mem_size > 0 &&
+ Gp_interconnect_queue_depth == 4 &&
+ Gp_interconnect_snd_queue_depth == 2)
+ {
+ int32 perQueue = Gp_interconnect_mem_size /
+ (Gp_max_packet_size * sliceNum);
+
+ calcSndDepth = Max(Gp_interconnect_snd_queue_depth, perQueue /
2);
+ calcQueueDepth = Max(Gp_interconnect_queue_depth, perQueue -
calcSndDepth);
+
+ if (calcSndDepth > MAX_QUEUE_SIZE)
+ calcSndDepth = MAX_QUEUE_SIZE;
+
+ if (calcQueueDepth > MAX_QUEUE_SIZE)
+ calcQueueDepth = MAX_QUEUE_SIZE;
+
+ Gp_interconnect_snd_queue_depth = calcSndDepth;
+ Gp_interconnect_queue_depth = calcQueueDepth;
+
+ elog(DEBUG1, "SetupUDPIFCInterconnect: queue depth, "
+ "queue_depth=%d, snd_queue_depth=%d, "
+ "mem_size=%d, slices=%d, packet_size=%d",
+ Gp_interconnect_queue_depth,
+ Gp_interconnect_snd_queue_depth,
+ Gp_interconnect_mem_size,
+ sliceNum,
+ Gp_max_packet_size);
+ }
+
SessionMotionLayerIPCParam param;
SetupSessionMotionLayerIPCParam(¶m);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]