This is an automated email from the ASF dual-hosted git repository.

gfphoenix78 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bb33f7d81b Port UDP motion layer fixes from ic_udpifc.c to udp2
5bb33f7d81b is described below

commit 5bb33f7d81b30fe058414108cae987f947ab5255
Author: zhangyue <[email protected]>
AuthorDate: Wed Oct 22 22:32:58 2025 +0800

    Port UDP motion layer fixes from ic_udpifc.c to udp2
    
    This commit integrates critical UDP motion layer improvements from
    commits a337ea2 and acec354b into the contrib/udp2 component,
    addressing extreme-scenario issues in resource-constrained
    environments.
    More details see commits a337ea2 and acec354b.
    
    Co-authored-by: zhaoxi <[email protected]>
---
 contrib/udp2/ic_common/ic_utility.hpp            |   2 +
 contrib/udp2/ic_common/udp2/ic_udp2.cpp          | 850 +++++++++++++++++++++--
 contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp | 114 ++-
 contrib/udp2/ic_udp2.c                           |  39 ++
 4 files changed, 931 insertions(+), 74 deletions(-)

diff --git a/contrib/udp2/ic_common/ic_utility.hpp 
b/contrib/udp2/ic_common/ic_utility.hpp
index 169c7b3cc18..b8c8919ff12 100644
--- a/contrib/udp2/ic_common/ic_utility.hpp
+++ b/contrib/udp2/ic_common/ic_utility.hpp
@@ -100,6 +100,8 @@ typedef enum GpVars_Interconnect_Method_IC
 {
        INTERCONNECT_FC_METHOD_CAPACITY_IC = 0,
        INTERCONNECT_FC_METHOD_LOSS_IC = 2,
+       INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC = 3,
+       INTERCONNECT_FC_METHOD_LOSS_TIMER_IC = 4,
 } GpVars_Interconnect_Method_IC;
 
 typedef enum
diff --git a/contrib/udp2/ic_common/udp2/ic_udp2.cpp 
b/contrib/udp2/ic_common/udp2/ic_udp2.cpp
index c09ea4e8434..a747d91f600 100644
--- a/contrib/udp2/ic_common/udp2/ic_udp2.cpp
+++ b/contrib/udp2/ic_common/udp2/ic_udp2.cpp
@@ -72,6 +72,21 @@ extern "C" {
 #include "ic_udp2_internal.hpp"
 #include "ic_faultinjection.h"
 
+/*
+ * Hints to the compiler about the likelihood of a branch. Both likely() and
+ * unlikely() return the boolean value of the contained expression.
+ *
+ * These should only be used sparingly, in very hot code paths. It's very easy
+ * to mis-estimate likelihoods.
+ */
+#if __GNUC__ >= 3
+#define likely(x)      __builtin_expect((x) != 0, 1)
+#define unlikely(x) __builtin_expect((x) != 0, 0)
+#else
+#define likely(x)      ((x) != 0)
+#define unlikely(x) ((x) != 0)
+#endif
+
 static int timeoutArray[] =
 {
        1,
@@ -121,7 +136,7 @@ static ICGlobalControlInfo ic_control_info;
 /*
  * All connections in a process share this unack queue ring instance.
  */
-static UnackQueueRing unack_queue_ring = {0, 0, 0};
+static UnackQueueRing unack_queue_ring = {0};
 
 static int     ICSenderSocket = -1;
 static int32 ICSenderPort = 0;
@@ -144,6 +159,8 @@ static std::condition_variable cv;
 
 CChunkTransportState *CChunkTransportStateImpl::state_ = nullptr;
 
+static struct mudp_manager mudp;
+
 /*
  * Identity the user of ic module by vector_engine_is_user:
  * "false" means PG executor, "true" means Arrow executor.
@@ -237,6 +254,8 @@ static ssize_t sendtoWithRetry(int socket, const void 
*message, size_t length, i
 
 static char *format_sockaddr_udp(struct sockaddr_storage *sa, char *buf, 
size_t len);
 
+static void initUdpManager(mudp_manager_t mptr);
+
 static char* flags2txt(uint32 pkt_flags);
 
 static const char* flags_text[] =
@@ -923,6 +942,14 @@ initUnackQueueRing(UnackQueueRing *uqr)
        {
                uqr->slots[i].init(ICBufferListType_Secondary);
        }
+
+#ifdef TIMEOUT_Z
+       uqr->retrans_count = 0;
+       uqr->no_retrans_count = 0;
+       uqr->time_difference = 0;
+       uqr->min = 0;
+       uqr->max = 0;
+#endif
 }
 
 /*
@@ -1109,6 +1136,344 @@ SendBufferPool::get(UDPConn *conn)
        return ret;
 }
 
+static struct rto_hashstore*
+initRTOHashstore()
+{
+       int i;
+       struct rto_hashstore* hs = (struct 
rto_hashstore*)ic_malloc(sizeof(struct rto_hashstore));
+
+       for (i = 0; i < RTO_HASH; i++)
+               TAILQ_INIT(&hs->rto_list[i]);
+
+       TAILQ_INIT(&hs->rto_list[RTO_HASH]);
+
+       return hs;
+}
+
+static void
+initUdpManager(mudp_manager_t mudp)
+{
+       mudp->rto_store = initRTOHashstore();
+       mudp->rto_list_cnt = 0;
+       mudp->cur_ts = 0;
+}
+
+static inline void
+addtoRTOList(mudp_manager_t mudp, UDPConn *cur_stream)
+{
+       if (!mudp->rto_list_cnt)
+       {
+               mudp->rto_store->rto_now_idx = 0;
+               mudp->rto_store->rto_now_ts = cur_stream->sndvar.ts_rto;
+       }
+
+       if (cur_stream->on_rto_idx < 0 )
+       {
+               if (cur_stream->on_timewait_list)
+                       return;
+
+               int diff = (int32_t)(cur_stream->sndvar.ts_rto - 
mudp->rto_store->rto_now_ts);
+               if (diff < RTO_HASH)
+               {
+                       int offset= (diff + mudp->rto_store->rto_now_idx) % 
RTO_HASH;
+                       cur_stream->on_rto_idx = offset;
+                       TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]),
+                                       cur_stream, sndvar.timer_link);
+               }
+               else
+               {
+                       cur_stream->on_rto_idx = RTO_HASH;
+                       
TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[RTO_HASH]),
+                                       cur_stream, sndvar.timer_link);
+               }
+               mudp->rto_list_cnt++;
+       }
+}
+
+static inline void
+removeFromRTOList(mudp_manager_t mudp,
+                               UDPConn *cur_stream)
+{
+       if (cur_stream->on_rto_idx < 0)
+               return;
+
+       TAILQ_REMOVE(&mudp->rto_store->rto_list[cur_stream->on_rto_idx],
+                       cur_stream, sndvar.timer_link);
+       cur_stream->on_rto_idx = -1;
+
+       mudp->rto_list_cnt--;
+}
+
+static inline void
+updateRetransmissionTimer(mudp_manager_t mudp,
+                               UDPConn *cur_stream,
+                               uint32_t cur_ts)
+{
+       cur_stream->sndvar.nrtx = 0;
+
+       /* if in rto list, remove it */
+       if (cur_stream->on_rto_idx >= 0)
+               removeFromRTOList(mudp, cur_stream);
+
+       /* Reset retransmission timeout */
+       if (UDP_SEQ_GT(cur_stream->snd_nxt, cur_stream->sndvar.snd_una))
+       {
+               /* there are packets sent but not acked */
+               /* update rto timestamp */
+               cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto;
+               addtoRTOList(mudp, cur_stream);
+       }
+
+       if (cur_stream->on_rto_idx == -1)
+       {
+               cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto;
+               addtoRTOList(mudp, cur_stream);
+       }
+}
+
+static int 
+handleRTO(mudp_manager_t mudp,
+                               uint32_t cur_ts,
+                               UDPConn *cur_stream,
+                               ICChunkTransportState *transportStates,
+                               TransportEntry *pEntry,
+                               UDPConn *triggerConn)
+{
+       /* check for expiration */
+       int                     count = 0;
+       int                     retransmits = 0;
+       UDPConn *currBuffConn = NULL;
+       uint32_t now = cur_ts;
+
+       Assert(unack_queue_ring.currentTime != 0);
+       removeFromRTOList(mudp, cur_stream);
+
+       while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < 
UNACK_QUEUE_RING_SLOTS_NUM)
+       {
+               /* expired, need to resend them */
+               ICBuffer   *curBuf = NULL;
+
+               while ((curBuf = 
unack_queue_ring.slots[unack_queue_ring.idx].pop()) != NULL)
+               {
+                       curBuf->nRetry++;
+                       currBuffConn = static_cast<UDPConn*>(curBuf->conn);
+                       putIntoUnackQueueRing(
+                                                               
&unack_queue_ring,
+                                                               curBuf,
+                                                               
currBuffConn->computeExpirationPeriod(curBuf->nRetry), now);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+                       trans_proto_stats.update(TPE_DATA_PKT_SEND, 
curBuf->pkt);
+#endif
+
+                       currBuffConn->sendOnce(curBuf->pkt);
+
+                       retransmits++;
+                       ic_statistics.retransmits++;
+                       currBuffConn->stat_count_resent++;
+                       currBuffConn->stat_max_resent = 
Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent);
+                       UDPConn::checkNetworkTimeout(curBuf, now, 
&transportStates->networkTimeoutIsLogged);
+
+#ifdef AMS_VERBOSE_LOGGING
+                       LOG(INFO, "RESEND pkt with seq %d (retry %d, rtt " 
UINT64_FORMAT ") to route %d",
+                                 curBuf->pkt->seq, curBuf->nRetry, 
curBuf->conn->rtt, curBuf->conn->route);
+                       logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
+#endif
+               }
+
+               unack_queue_ring.currentTime += TIMER_SPAN;
+               unack_queue_ring.idx = (unack_queue_ring.idx + 1) % 
(UNACK_QUEUE_RING_SLOTS_NUM);
+       }
+       return 0;
+}
+
+static inline void
+rearrangeRTOStore(mudp_manager_t mudp)
+{
+       UDPConn *walk, *next;
+       struct rto_hashstore::rto_head* rto_list = 
&mudp->rto_store->rto_list[RTO_HASH];
+       int cnt = 0;
+
+       for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next)
+       {
+               next = TAILQ_NEXT(walk, sndvar.timer_link);
+
+               int diff = (int32_t)(mudp->rto_store->rto_now_ts - 
walk->sndvar.ts_rto);
+               if (diff < RTO_HASH)
+               {
+                       int offset = (diff + mudp->rto_store->rto_now_idx) % 
RTO_HASH;
+                       TAILQ_REMOVE(&mudp->rto_store->rto_list[RTO_HASH],
+                                       walk, sndvar.timer_link);
+                       walk->on_rto_idx = offset;
+                       TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]),
+                                       walk, sndvar.timer_link);
+               }
+               cnt++;
+       }
+}
+
+static inline void
+checkRtmTimeout(mudp_manager_t mudp,
+                               uint32_t cur_ts,
+                               int thresh,
+                               ICChunkTransportState *transportStates,
+                               TransportEntry *pEntry,
+                               UDPConn *triggerConn)
+{
+       UDPConn *walk, *next;
+       struct rto_hashstore::rto_head* rto_list;
+       int cnt;
+
+       if (!mudp->rto_list_cnt)
+               return;
+
+       cnt = 0;
+
+       while (1)
+       {
+               rto_list = 
&mudp->rto_store->rto_list[mudp->rto_store->rto_now_idx];
+               if ((int32_t)(cur_ts - mudp->rto_store->rto_now_ts) < 0)
+                       break;
+
+               for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next)
+               {
+                       if (++cnt > thresh)
+                               break;
+                       next = TAILQ_NEXT(walk, sndvar.timer_link);
+
+                       if (walk->on_rto_idx >= 0)
+                       {
+                               TAILQ_REMOVE(rto_list, walk, sndvar.timer_link);
+                               mudp->rto_list_cnt--;
+                               walk->on_rto_idx = -1;
+                               handleRTO(mudp, cur_ts, walk, transportStates, 
pEntry, triggerConn);
+                       }
+               }
+
+               if (cnt > thresh)
+               {
+                       break;
+               }
+               else
+               {
+                       mudp->rto_store->rto_now_idx = 
(mudp->rto_store->rto_now_idx + 1) % RTO_HASH;
+                       mudp->rto_store->rto_now_ts++;
+                       if (!(mudp->rto_store->rto_now_idx % 1000))
+                               rearrangeRTOStore(mudp);
+               }
+
+       }
+}
+
+/*
+ * estimateRTT - Dynamically estimates the Round-Trip Time (RTT) and adjusts 
Retransmission Timeout (RTO)
+ * 
+ * This function implements a variant of the Jacobson/Karels algorithm for RTT 
estimation, adapted for UDP-based
+ * motion control connections. It updates smoothed RTT (srtt), mean deviation 
(mdev), and RTO values based on 
+ * newly measured RTT samples (mrtt). The RTO calculation ensures reliable 
data transmission over unreliable networks.
+ *
+ * Key Components:
+ *   - srtt:   Smoothed Round-Trip Time (weighted average of historical RTT 
samples)
+ *   - mdev:   Mean Deviation (measure of RTT variability)
+ *   - rttvar: Adaptive RTT variation bound (used to clamp RTO updates)
+ *   - rto:    Retransmission Timeout (dynamically adjusted based on srtt + 
rttvar)
+ *
+ * Algorithm Details:
+ *   1. For the first RTT sample:
+ *        srtt    = mrtt << 3   (scaled by 8 for fixed-point arithmetic)
+ *        mdev    = mrtt << 1   (scaled by 2)
+ *        rttvar  = max(mdev, rto_min)
+ *   2. For subsequent samples:
+ *        Delta   = mrtt - (srtt >> 3)  (difference between new sample and 
smoothed RTT)
+ *        srtt   += Delta               (update srtt with 1/8 weight of new 
sample)
+ *        Delta   = abs(Delta) - (mdev >> 2)
+ *        mdev   += Delta               (update mdev with 1/4 weight)
+ *   3. rttvar bounds the maximum RTT variation:
+ *        If mdev > mdev_max, update mdev_max and rttvar
+ *        On new ACKs (snd_una > rtt_seq), decay rttvar toward mdev_max
+ *   4. Final RTO calculation:
+ *        rto = (srtt >> 3) + rttvar   (clamped to RTO_MAX)
+ *
+ * Parameters:
+ *   @mConn:  Parent motion connection context (container of MotionConnUDP)
+ *   @mrtt:   Measured Round-Trip Time (in microseconds) for the latest packet
+ *
+ * Notes:
+ *   - Designed for non-retransmitted packets to avoid sampling bias.
+ *   - Uses fixed-point arithmetic to avoid floating-point operations.
+ *   - Minimum RTO (rto_min) is set to 20ms (HZ/5/10, assuming HZ=100).
+ *   - Critical for adaptive timeout control in UDP protocols where 
reliability is implemented at the application layer.
+ *   - Thread-unsafe: Must be called in a synchronized context (e.g., packet 
processing loop).
+ */
+static inline void
+estimateRTT(UDPConn *conn , uint32_t mrtt)
+{
+       /* This function should be called for not retransmitted packets */
+       /* TODO: determine rto_min */
+
+       long m = mrtt;
+       uint32_t rto_min = UDP_RTO_MIN / 10;
+
+       if (m == 0)
+               m = 1;
+
+       /*
+        * Special RTO optimization for high-speed networks:
+        * When measured RTT (m) is below 100 microseconds and current RTO is 
under 10ms,
+        * forcibly set RTO to half of RTO_MIN. This targets two scenarios:
+        *   - Loopback interfaces (localhost communication)
+        *   - Ultra-low-latency networks (e.g., InfiniBand, RDMA)
+        */
+       if(m < 100 && conn->rttvar.rto < 10000)
+       {
+               conn->rttvar.rto = RTO_MIN / 2;
+       }
+
+       if (conn->rttvar.srtt != 0)
+       {
+               /* rtt = 7/8 rtt + 1/8 new */
+               m -= (conn->rttvar.srtt >> LOSS_THRESH);
+               conn->rttvar.srtt += m;
+               if (m < 0)
+               {
+                       m = -m;
+                       m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA);
+                       if (m > 0)
+                               m >>= LOSS_THRESH;
+               }
+               else
+               {
+                       m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA);
+               }
+               conn->rttvar.mdev += m;
+               if (conn->rttvar.mdev > conn->rttvar.mdev_max)
+               {
+                       conn->rttvar.mdev_max = conn->rttvar.mdev;
+                       if (conn->rttvar.mdev_max > conn->rttvar.rttvar)
+                       {
+                               conn->rttvar.rttvar = conn->rttvar.mdev_max;
+                       }
+               }
+               if (UDP_SEQ_GT(conn->rttvar.snd_una, conn->rttvar.rtt_seq))
+               {
+                       if (conn->rttvar.mdev_max < conn->rttvar.rttvar)
+                       {
+                               conn->rttvar.rttvar -= (conn->rttvar.rttvar - 
conn->rttvar.mdev_max) >> RTT_SHIFT_ALPHA;
+                       }
+                       conn->rttvar.mdev_max = rto_min;
+               }
+       }
+       else
+       {
+               /* fresh measurement */
+               conn->rttvar.srtt = m << LOSS_THRESH;
+               conn->rttvar.mdev = m << 1;
+               conn->rttvar.mdev_max = conn->rttvar.rttvar = 
MAX(conn->rttvar.mdev, rto_min);
+       }
+
+       conn->rttvar.rto = ((conn->rttvar.srtt >> LOSS_THRESH) + 
conn->rttvar.rttvar) > RTO_MAX ? RTO_MAX : ((conn->rttvar.srtt >> LOSS_THRESH) 
+ conn->rttvar.rttvar);
+}
+
 /*
  * addCRC
  *             add CRC field to the packet.
@@ -1844,6 +2209,19 @@ setupOutgoingUDPConnection(int icid, TransportEntry 
*pEntry, UDPConn *conn)
        conn->msgSize = sizeof(conn->conn_info);
        conn->stillActive = true;
        conn->conn_info.seq = 1;
+       conn->rttvar.ts_rto = 0;
+       conn->rttvar.rto = UDP_INITIAL_RTO;
+       conn->rttvar.srtt = 0;
+       conn->rttvar.rttvar = 0;
+       conn->rttvar.snd_una = 0;
+       conn->rttvar.nrtx = 0;
+       conn->rttvar.max_nrtx = 0;
+       conn->rttvar.mss = UDP_DEFAULT_MSS;
+       conn->rttvar.cwnd = 2;
+       conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH;
+       conn->rttvar.loss_count = 0;
+       conn->rttvar.karn_mode = false;
+       conn->on_rto_idx = -1;
        Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == 
AF_INET6);
 }
 
@@ -2324,6 +2702,9 @@ UDPConn::computeExpirationPeriod(uint32 retry)
        else
 #endif
        {
+               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+                       return Min(retry > 3 ? this->rttvar.rto * retry : 
this->rttvar.rto, UNACK_QUEUE_RING_LENGTH_LOSS);
+
                uint32 factor = (retry <= 12 ? retry : 12);
                return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, 
(int)(this->rtt + (this->dev << 2)) << (factor)));
        }
@@ -2418,7 +2799,7 @@ UDPConn::DeactiveConn()
  *     packet is retransmitted.
  */
 void
-UDPConn::handleAckedPacket(ICBuffer *buf, uint64 now)
+UDPConn::handleAckedPacket(ICBuffer *buf, uint64 now, struct icpkthdr *pkt)
 {
        uint64          ackTime = 0;
        bool            bufIsHead = false; 
@@ -2428,6 +2809,39 @@ UDPConn::handleAckedPacket(ICBuffer *buf, uint64 now)
 
        buf = this->unackQueue.remove(buf);
 
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC || 
session_param.Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER_IC)
+       {
+               bufConn = static_cast<UDPConn*>(buf->conn);
+               buf = 
unack_queue_ring.slots[buf->unackQueueRingSlot].remove(buf);
+               unack_queue_ring.numOutStanding--;
+               if (this->unackQueue.length() >= 1)
+               unack_queue_ring.numSharedOutStanding--;
+
+               ackTime = now - buf->sentTime;
+
+               if (buf->nRetry == 0)
+               {
+                       /* adjust the congestion control window. */
+                       if (snd_control_info.cwnd < snd_control_info.ssthresh)
+                               snd_control_info.cwnd += 2;
+                       else
+                               snd_control_info.cwnd += 1 / 
snd_control_info.cwnd;
+                       snd_control_info.cwnd = Min(snd_control_info.cwnd, 
snd_buffer_pool.maxCount);
+               }
+
+               if ((bufConn->rttvar.rto << 1) > ackTime && pkt->retry_times != 
session_param.Gp_interconnect_min_retries_before_timeout)
+                       estimateRTT(bufConn, (now - pkt->send_time));
+
+               if (buf->nRetry && pkt->retry_times > 0 && pkt->retry_times < 
session_param.Gp_interconnect_min_retries_before_timeout)
+                       bufConn->rttvar.rto += (bufConn->rttvar.rto >> 4 * 
buf->nRetry);
+
+               if (unlikely(session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC))
+               {
+                       bufConn->sndvar.ts_rto = bufConn->rttvar.rto;
+                       addtoRTOList(&mudp, bufConn);
+               }
+       }
+
        if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
        {
                ICBufferList *alist = 
&unack_queue_ring.slots[buf->unackQueueRingSlot];
@@ -2608,24 +3022,59 @@ getCurrentTime(void)
 static void
 putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, 
uint64 now)
 {
+       UDPConn *buffConn = static_cast<UDPConn*>(buf->conn);
        uint64          diff = 0;
        int                     idx = 0;
-
-       /* The first packet, currentTime is not initialized */
-       if (uqr->currentTime == 0)
-               uqr->currentTime = now - (now % TIMER_SPAN);
-
-       diff = now + expTime - uqr->currentTime;
-       if (diff >= UNACK_QUEUE_RING_LENGTH)
+       
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
        {
+               /* The first packet, currentTime is not initialized */
+#ifndef TIMEOUT_Z
+               if (uqr->currentTime == 0)
+                       uqr->currentTime = now - (now % TIMER_SPAN_LOSS);
+#else
+               if (uqr->currentTime == 0 && buffConn->rttvar.rto == 0)
+                       uqr->currentTime = now - (now % TIMER_SPAN_LOSS);
+               else
+                       uqr->currentTime = now + buffConn->rttvar.rto;
+
+#endif
+               diff = expTime;
+               if (diff >= UNACK_QUEUE_RING_LENGTH_LOSS)
+               {
 #ifdef AMS_VERBOSE_LOGGING
-               LOG(INFO, "putIntoUnackQueueRing:" "now %lu expTime %lu diff 
%lu uqr-currentTime %lu", now, expTime, diff, uqr->currentTime);
+                       LOG(INFO, "putIntoUnackQueueRing:" "now " UINT64_FORMAT 
"expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " 
UINT64_FORMAT, now, expTime, diff, uqr->currentTime);
 #endif
-               diff = UNACK_QUEUE_RING_LENGTH - 1;
+                       diff = UNACK_QUEUE_RING_LENGTH_LOSS - 1;
+               }
+               else if (diff < TIMER_SPAN_LOSS)
+               {
+                       diff = diff < TIMER_SPAN_LOSS ? TIMER_SPAN_LOSS : diff;
+               }
        }
-       else if (diff < TIMER_SPAN)
+       else
        {
-               diff = TIMER_SPAN;
+               if (uqr->currentTime == 0)
+               uqr->currentTime = now - (now % TIMER_SPAN_LOSS);
+
+               diff = now + expTime - uqr->currentTime;
+               if (diff >= UNACK_QUEUE_RING_LENGTH)
+               {
+#ifdef AMS_VERBOSE_LOGGING
+                       LOG(INFO, "putIntoUnackQueueRing:" "now " UINT64_FORMAT 
"expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " 
UINT64_FORMAT, now, expTime, diff, uqr->currentTime);
+#endif
+                       diff = UNACK_QUEUE_RING_LENGTH - 1;
+               }
+               else if (diff < TIMER_SPAN)
+               {
+                       diff = TIMER_SPAN;
+               }
+
+               idx = (uqr->idx + diff / TIMER_SPAN) % 
UNACK_QUEUE_RING_SLOTS_NUM;
+
+#ifdef AMS_VERBOSE_LOGGING
+               LOG(INFO, "PUTTW: curtime " UINT64_FORMAT " now " UINT64_FORMAT 
" (diff " UINT64_FORMAT ") expTime " UINT64_FORMAT " previdx %d, nowidx %d, 
nextidx %d", uqr->currentTime, now, diff, expTime, buf->unackQueueRingSlot, 
uqr->idx, idx);
+#endif
        }
 
        idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM;
@@ -2784,6 +3233,30 @@ handleDataPacket(UDPConn *conn, icpkthdr *pkt, struct 
sockaddr_storage *peer, so
                logPkt("Interconnect error: received a packet when the queue is 
full ", pkt);
                ic_statistics.disorderedPktNum++;
                conn->stat_count_dropped++;
+
+               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC && 
rx_control_info.mainWaitingState.waiting &&
+                       rx_control_info.mainWaitingState.waitingNode == 
pkt->motNodeId &&
+                       rx_control_info.mainWaitingState.waitingQuery == 
pkt->icId)
+               {
+                       if (rx_control_info.mainWaitingState.waitingRoute == 
ANY_ROUTE)
+                       {
+                               if (rx_control_info.mainWaitingState.reachRoute 
== ANY_ROUTE)
+                                       
rx_control_info.mainWaitingState.reachRoute = conn->route;
+                       }
+                       else if (rx_control_info.mainWaitingState.waitingRoute 
== conn->route)
+                       {
+                               if (IC_DEBUG2 >= session_param.log_min_messages)
+                                       LOG(INFO, "rx thread: main_waiting 
waking it route %d", rx_control_info.mainWaitingState.waitingRoute);
+                               rx_control_info.mainWaitingState.reachRoute = 
conn->route;
+                       }
+                       /* WAKE MAIN THREAD HERE */
+                       *wakeup_mainthread = true;
+               }
+
+               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+               {
+                       conn->setAckParam(param, UDPIC_FLAGS_FULL, 
conn->conn_info.seq - 1, conn->conn_info.extraSeq);
+               }
                return false;
        }
 
@@ -3060,9 +3533,23 @@ rxThreadFunc(void *arg)
                        UDPConn *conn = ic_control_info.connHtab.find(pkt);
                        if (conn != NULL)
                        {
+                               uint64          now = getCurrentTime();
+                               uint64 send_time = pkt->send_time;
+                               uint64 recv_time = now;
+                               uint64 retry_times = pkt->retry_times;
+
+                               bool drop_ack = pkt->seq < conn->conn_info.seq 
? true : false;
                                /* Handling a regular packet */
                                if (handleDataPacket(conn, pkt, &peer, 
&peerlen, &param, &wakeup_mainthread))
                                        pkt = NULL;
+                               if (!pkt)
+                               {
+                                       param.msg.send_time = send_time;
+                                       param.msg.recv_time = recv_time;
+                                       param.msg.retry_times = retry_times;
+                               }
+                               if (drop_ack)
+                                       param.msg.retry_times = 
session_param.Gp_interconnect_min_retries_before_timeout;
                                ic_statistics.recvPktNum++;
                        }
                        else
@@ -3835,7 +4322,7 @@ UDPConn::sendBuffers()
        {
                ICBuffer   *buf = NULL;
 
-               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
                {
                        if (this->unackQueue.length() > 0 &&
                                unack_queue_ring.numSharedOutStanding >= 
(snd_control_info.cwnd - snd_control_info.minCwnd))
@@ -3858,7 +4345,7 @@ UDPConn::sendBuffers()
 
                this->unackQueue.append(buf);
 
-               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
                {
                        unack_queue_ring.numOutStanding++;
                        if (this->unackQueue.length() > 1)
@@ -3882,6 +4369,10 @@ UDPConn::sendBuffers()
                trans_proto_stats.update(TPE_DATA_PKT_SEND, buf->pkt);
 #endif
 
+               struct icpkthdr *pkt_ = buf->pkt;
+               pkt_->send_time = now;
+               pkt_->recv_time = 0;
+               pkt_->retry_times = buf->nRetry;
                this->sendOnce(buf->pkt);
 
                ic_statistics.sndPktNum++;
@@ -4022,7 +4513,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
 
                if (buf->pkt->seq == pkt->seq)
                {
-                       this->handleAckedPacket(buf, now);
+                       this->handleAckedPacket(buf, now, pkt);
                        shouldSendBuffers = true;
                        break;
                }
@@ -4032,7 +4523,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
                        /* this is a lost packet, retransmit */
 
                        buf->nRetry++;
-                       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+                       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
                        {
                                ICBufferList *alist = 
&unack_queue_ring.slots[buf->unackQueueRingSlot];
                                buf = alist->remove(buf);
@@ -4063,7 +4554,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
                        /* remove packet already received. */
 
                        next = link->next;
-                       this->handleAckedPacket(buf, now);
+                       this->handleAckedPacket(buf, now, pkt);
                        shouldSendBuffers = true;
                        link = next;
                        buf = GET_ICBUFFER_FROM_PRIMARY(link);
@@ -4080,7 +4571,7 @@ UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
                        lostPktCnt--;
                }
        }
-       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
        {
                snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, 
snd_control_info.minCwnd);
                snd_control_info.cwnd = snd_control_info.ssthresh;
@@ -4129,7 +4620,7 @@ UDPConn::handleAckForDuplicatePkt(icpkthdr *pkt)
        while (!this->unackQueue.is_head(link) && (buf->pkt->seq <= 
pkt->extraSeq))
        {
                next = link->next;
-               this->handleAckedPacket(buf, now);
+               this->handleAckedPacket(buf, now, pkt);
                shouldSendBuffers = true;
                link = next;
                buf = GET_ICBUFFER_FROM_PRIMARY(link);
@@ -4141,7 +4632,7 @@ UDPConn::handleAckForDuplicatePkt(icpkthdr *pkt)
                next = link->next;
                if (buf->pkt->seq == pkt->seq)
                {
-                       this->handleAckedPacket(buf, now);
+                       this->handleAckedPacket(buf, now, pkt);
                        shouldSendBuffers = true;
                        break;
                }
@@ -4215,59 +4706,225 @@ void
 UDPConn::checkExpiration(ICChunkTransportState *transportStates, uint64 now)
 {
        /* check for expiration */
-       int                     count = 0;
-       int                     retransmits = 0;
+       int     count = 0;
+       int     retransmits = 0;
        UDPConn *currBuffConn = NULL;
 
        Assert(unack_queue_ring.currentTime != 0);
-       while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < 
UNACK_QUEUE_RING_SLOTS_NUM)
+
+       if (unlikely(session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC))
        {
-               /* expired, need to resend them */
-               ICBuffer   *curBuf = NULL;
+               checkRtmTimeout(&mudp, now, 500, transportStates, this->entry_, 
this);
+               return;
+       }
+
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+       {
+               uint64 timer_span_time = unack_queue_ring.currentTime + 
TIMER_SPAN_LOSS;
 
-               while (true)
+               while (now >= (timer_span_time + 
unack_queue_ring.time_difference)  && count++ < UNACK_QUEUE_RING_SLOTS_NUM)
                {
-                       ICBufferList *alist = 
&unack_queue_ring.slots[unack_queue_ring.idx];
-                       curBuf = alist->pop();
-                       if (curBuf == NULL)
-                               break;
-                       UDPConn *conn = static_cast<UDPConn*>(curBuf->conn);
-                       curBuf->nRetry++;
-                       putIntoUnackQueueRing(&unack_queue_ring,
-                                                                 curBuf,
-                                                                 
conn->computeExpirationPeriod(curBuf->nRetry), now);
+                       /* expired, need to resend them */
+                       ICBuffer   *curBuf = NULL;
+
+                       while ((curBuf = 
unack_queue_ring.slots[unack_queue_ring.idx].pop()) != NULL)
+                       {
+                               UDPConn *conn = 
static_cast<UDPConn*>(curBuf->conn);
+                               curBuf->nRetry++;
+
+                               /*
+                                * Fixed Timeout Thresholds: Traditional 
TCP-style Retransmission Timeout
+                                * (RTTVAR.RTO) calculations may be too rigid 
for networks with volatile
+                                * latency. This leads to:
+                                *   Premature Retransmissions: Unnecessary 
data resends during temporary
+                                *     latency spikes, wasting bandwidth.
+                                *   Delayed Recovery: Slow reaction to actual 
packet loss when RTO is
+                                *     overly conservative.
+                                * 
+                                * Lack of Context Awareness: Static RTO 
ignores real-time network behavior
+                                * patterns, reducing throughput and 
responsiveness.
+                                * 
+                                * Solution: Dynamic Timeout Threshold 
Adjustment
+                                * Implements an adaptive timeout mechanism to 
optimize retransmission:
+                                *   if (now < (curBuf->sentTime + 
conn->rttvar.rto)) {
+                                *       uint32_t diff = (curBuf->sentTime + 
conn->rttvar.rto) - now;
+                                *       // ... (statistical tracking and 
threshold adjustment)
+                                *   }
+                                *   Temporary Latency Spike: Uses max 
(conservative) to avoid false
+                                *     retransmits, reducing bandwidth waste 
(vs. traditional mistaken
+                                *     retransmissions).
+                                *   Persistent Packet Loss: Prioritizes min 
(aggressive) via
+                                *     weight_retrans, accelerating recovery 
(vs. slow fixed-RTO reaction).
+                                *   Stable Network: Balances weights for 
equilibrium throughput (vs.
+                                *     static RTO limitations).
+                                */
+                               if (now < (curBuf->sentTime +  
conn->rttvar.rto))
+                               {
+#ifdef TIMEOUT_Z
+                                       uint32_t diff = (curBuf->sentTime +  
conn->rttvar.rto) - now;
+                                       if(unack_queue_ring.retrans_count == 0 
&& unack_queue_ring.no_retrans_count == 0)
+                                       {
+                                               unack_queue_ring.min = diff;
+                                               unack_queue_ring.max = diff;
+                                       }
+
+                                       if (diff < unack_queue_ring.min) 
unack_queue_ring.min = diff;
+                                       if (diff > unack_queue_ring.max) 
unack_queue_ring.max = diff;
+
+                                       if (unack_queue_ring.retrans_count == 0)
+                                               
unack_queue_ring.time_difference = unack_queue_ring.max;
+                                       else if 
(unack_queue_ring.no_retrans_count == 0 && ic_statistics.retransmits < 
(session_param.Gp_interconnect_min_retries_before_timeout / 4)) 
+                                               
unack_queue_ring.time_difference = 0;
+                                       else
+                                       {
+                                               uint32_t total_count = 
unack_queue_ring.retrans_count + unack_queue_ring.no_retrans_count;
+                                               double weight_retrans = 
(double)unack_queue_ring.retrans_count / total_count;
+                                               double weight_no_retrans = 
(double)unack_queue_ring.no_retrans_count / total_count;
+                                               
unack_queue_ring.time_difference = (uint32_t)(unack_queue_ring.max * 
weight_no_retrans + unack_queue_ring.min * weight_retrans);
+                                       }
+                                       
+                                       ++unack_queue_ring.no_retrans_count;
+                               }
+                               else
+                                       ++unack_queue_ring.retrans_count;
+#endif
 
 #ifdef TRANSFER_PROTOCOL_STATS
-                       trans_proto_stats.update(TPE_DATA_PKT_SEND, 
curBuf->pkt);
+                               trans_proto_stats.update(TPE_DATA_PKT_SEND, 
curBuf->pkt);
 #endif
 
-                       conn->sendOnce(curBuf->pkt);
+                               currBuffConn = 
static_cast<UDPConn*>(curBuf->conn);
+                               putIntoUnackQueueRing(&unack_queue_ring,
+                                                                         
curBuf,
+                                                                         
currBuffConn->computeExpirationPeriod(curBuf->nRetry), getCurrentTime());
+                               struct icpkthdr *pkt_ = curBuf->pkt;
 
-                       currBuffConn = conn;
+                               pkt_->send_time = getCurrentTime();
+                               pkt_->recv_time = 0;
+                               pkt_->retry_times = curBuf->nRetry;
 
-                       retransmits++;
-                       ic_statistics.retransmits++;
-                       currBuffConn->stat_count_resent++;
-                       currBuffConn->stat_max_resent = 
Max(currBuffConn->stat_max_resent,
-                                                                               
                currBuffConn->stat_count_resent);
+                               currBuffConn->sendOnce(curBuf->pkt);
 
-                       UDPConn::checkNetworkTimeout(curBuf, now, 
&transportStates->networkTimeoutIsLogged);
+                               /*
+                                * Adaptive Retry Backoff with Polling for 
Network Asymmetry Mitigation
+                                *
+                                * This logic addresses two critical network 
pathologies:
+                                *  1. RTO Distortion Amplification: 
+                                *     - Packet loss in volatile networks 
causes RTO-based retransmission errors
+                                *     - Multiple spurious retries increase 
network load and congestion collapse risk
+                                *  2. Data Skew-Induced Starvation:
+                                *     - Under unbalanced workloads, 
low-traffic nodes experience MON (Message Order Number) delays
+                                *     - Delayed ACKs trigger false 
retransmissions even when packets arrive eventually
+                                *     - Unacked queue inflation worsens 
congestion in high-traffic nodes
+                                */
+                               int32_t loop_ack = curBuf->nRetry;
+                               uint32_t rto_min = UDP_RTO_MIN / 10;
+                               uint32_t rtoMs = conn->rttvar.rto / 1000;
+                               int32_t wait_time = rto_min > rtoMs ? rto_min : 
rtoMs;
+                               int32_t loop = 0;
+
+                               /*
+                                * To optimize performance, we need to process 
all the time-out file descriptors (fds)
+                                * in each batch together.
+                                */
+                               if (loop_ack > 0)
+                               {
+                                       while (loop++ < loop_ack)
+                                       {
+                                               if 
(this->entry_->pollAcks(wait_time))
+                                               {
+                                                       
this->entry_->handleAcks(false);
+                                                       curBuf->nRetry = 0;
+                                                       break;
+                                               }
+
+                                               struct icpkthdr *pkt_ = 
curBuf->pkt;
+                                               pkt_->send_time = 
getCurrentTime();
+                                               pkt_->recv_time = 0;
+                                               pkt_->retry_times = 
curBuf->nRetry;
+                                               currBuffConn->sendOnce(pkt_);
+
+                                               if (loop_ack < 
(session_param.Gp_interconnect_min_retries_before_timeout / 10))
+                                                       wait_time += wait_time 
/ 10;
+                                               else if (loop_ack > 
(session_param.Gp_interconnect_min_retries_before_timeout / 10) && loop_ack < 
(session_param.Gp_interconnect_min_retries_before_timeout / 5))
+                                                       wait_time += RTO_MAX / 
10;
+                                               else if (loop_ack > 
(session_param.Gp_interconnect_min_retries_before_timeout / 5) && loop_ack < 
(session_param.Gp_interconnect_min_retries_before_timeout / 2))
+                                                       wait_time += RTO_MAX / 
5;
+                                               else if (loop_ack < 
(session_param.Gp_interconnect_min_retries_before_timeout))
+                                                       wait_time += RTO_MAX;
+                                       };
+                               }
+
+                               if (loop_ack > 
session_param.Gp_interconnect_min_retries_before_timeout / 5)
+                                       LOG(INFO, "Resending packet (seq %d) to 
%s (pid %d cid %d) with %d retries in %lu seconds",
+                                                       curBuf->pkt->seq, 
curBuf->conn->remoteHostAndPort,
+                                                       curBuf->pkt->dstPid, 
curBuf->pkt->dstContentId, curBuf->nRetry,
+                                                       (now - 
curBuf->sentTime) / 1000 / 1000);
+
+                               retransmits++;
+                               ic_statistics.retransmits++;
+                               currBuffConn->stat_count_resent++;
+                               currBuffConn->stat_max_resent = 
Max(currBuffConn->stat_max_resent,
+                                                                               
                        currBuffConn->stat_count_resent);
+
+                               UDPConn::checkNetworkTimeout(curBuf, now, 
&transportStates->networkTimeoutIsLogged);
 
 #ifdef AMS_VERBOSE_LOGGING
-                       LOG(INFO, "RESEND pkt with seq %d (retry %d, rtt %lu) 
to route %d",
-                                         curBuf->pkt->seq, curBuf->nRetry, 
currBuffConn->rtt, currBuffConn->route);
-                       logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
+                               LOG(INFO, "RESEND pkt with seq %d (retry %d, 
rtt " UINT64_FORMAT ") to route %d",
+                                               curBuf->pkt->seq, 
curBuf->nRetry, currBuffConn->rtt, currBuffConn->route);
+                               logPkt("RESEND PKT in checkExpiration", 
curBuf->pkt);
 #endif
+                       }
+
+                       timer_span_time += TIMER_SPAN_LOSS;
+                       unack_queue_ring.idx = (unack_queue_ring.idx + 1) % 
(UNACK_QUEUE_RING_SLOTS_NUM);
                }
+       }
+       else
+       {
+               while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && 
count++ < UNACK_QUEUE_RING_SLOTS_NUM)
+               {
+                       /* expired, need to resend them */
+                       ICBuffer   *curBuf = NULL;
 
-               unack_queue_ring.currentTime += TIMER_SPAN;
-               unack_queue_ring.idx = (unack_queue_ring.idx + 1) % 
(UNACK_QUEUE_RING_SLOTS_NUM);
+                       while ((curBuf = 
unack_queue_ring.slots[unack_queue_ring.idx].pop()) != NULL)
+                       {
+                               curBuf->nRetry++;
+                               currBuffConn = 
static_cast<UDPConn*>(curBuf->conn);
+                               putIntoUnackQueueRing(
+                                                                       
&unack_queue_ring,
+                                                                       curBuf,
+                                                                       
currBuffConn->computeExpirationPeriod(curBuf->nRetry), now);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+                               trans_proto_stats.update(TPE_DATA_PKT_SEND, 
curBuf->pkt);
+#endif
+
+                               currBuffConn->sendOnce(curBuf->pkt);
+
+                               retransmits++;
+                               ic_statistics.retransmits++;
+                               currBuffConn->stat_count_resent++;
+                               currBuffConn->stat_max_resent = 
Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent);
+                               UDPConn::checkNetworkTimeout(curBuf, now, 
&transportStates->networkTimeoutIsLogged);
+
+#ifdef AMS_VERBOSE_LOGGING
+                               LOG(INFO, "RESEND pkt with seq %d (retry %d, 
rtt " UINT64_FORMAT ") to route %d",
+                                         curBuf->pkt->seq, curBuf->nRetry, 
curBuf->conn->rtt, curBuf->conn->route);
+                               logPkt("RESEND PKT in checkExpiration", 
curBuf->pkt);
+#endif
+                       }
+
+                       unack_queue_ring.currentTime += TIMER_SPAN;
+                       unack_queue_ring.idx = (unack_queue_ring.idx + 1) % 
(UNACK_QUEUE_RING_SLOTS_NUM);
+               }
+
+               /*
+                * deal with case when there is a long time this function is 
not called.
+                */
+               unack_queue_ring.currentTime = now - (now % (TIMER_SPAN));
        }
 
-       /*
-        * deal with case when there is a long time this function is not called.
-        */
-       unack_queue_ring.currentTime = now - (now % TIMER_SPAN);
        if (retransmits > 0)
        {
                snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, 
snd_control_info.minCwnd);
@@ -4325,14 +4982,29 @@ UDPConn::checkDeadlock()
                        ic_control_info.lastDeadlockCheckTime = now;
                        ic_statistics.statusQueryMsgNum++;
 
+                       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC && this->entry_->pollAcks(50))
+                       {
+                               this->entry_->handleAcks(false);
+                               this->deadlockCheckBeginTime = now;
+                       }
+
                        /* check network error. */
-                       if ((now - this->deadlockCheckBeginTime) > ((uint64) 
session_param.Gp_interconnect_transmit_timeout * 1000 * 1000))
+                       if ((now - this->deadlockCheckBeginTime) > ((uint64) 
session_param.Gp_interconnect_transmit_timeout * 100 * 1000))
                        {
-                               std::stringstream ss;
-                               ss <<"ERROR, interconnect encountered a network 
error, please check your network"<<
-                                       "Did not get any response from 
"<<remoteHostAndPort<<" (pid "<<conn_info.dstPid<<
-                                       " cid "<<conn_info.dstContentId<<") in 
"<<session_param.Gp_interconnect_transmit_timeout<<" seconds.",
-                               throw ICNetworkException(ss.str(), __FILE__, 
__LINE__);
+                               LOG(INFO, "Did not get any response from %s 
(pid %d cid %d) in 600 seconds.", this->remoteHostAndPort,
+                                                 this->conn_info.dstPid, 
this->conn_info.dstContentId);
+
+                               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC)
+                                       this->capacity += 1;
+
+                               if ((now - this->deadlockCheckBeginTime) > 
((uint64)session_param.Gp_interconnect_transmit_timeout * 1000 * 1000))
+                               {
+                                       std::stringstream ss;
+                                       ss << "ERROR, interconnect encountered 
a network error, please check your network."
+                                          << "Did not get any response from " 
<< remoteHostAndPort << " (pid " << conn_info.dstPid << " cid " << 
conn_info.dstContentId << ") in "
+                                          << 
session_param.Gp_interconnect_transmit_timeout << " seconds.";
+                                       throw ICNetworkException(ss.str(), 
__FILE__, __LINE__);
+                               }
                        }
                }
        }
@@ -4375,7 +5047,7 @@ UDPConn::checkExpirationCapacityFC(int timeout)
                ic_control_info.lastPacketSendTime = now;
 
                this->updateRetransmitStatistics();
-               checkNetworkTimeout(buf, now, 
&entry_->state->networkTimeoutIsLogged);
+               UDPConn::checkNetworkTimeout(buf, now, 
&entry_->state->networkTimeoutIsLogged);
        }
 }
 
@@ -4394,13 +5066,13 @@ UDPConn::checkExceptions(int retry, int timeout)
                this->checkExpirationCapacityFC(timeout);
        }
 
-       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC || session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
        {
                uint64 now = getCurrentTime();
 
                if (now - ic_control_info.lastExpirationCheckTime > 
uint64(TIMER_CHECKING_PERIOD))
                {
-                       UDPConn::checkExpiration(this->entry_->state, now);
+                       this->checkExpiration(this->entry_->state, now);
                        ic_control_info.lastExpirationCheckTime = now;
                }
        }
@@ -4434,12 +5106,23 @@ UDPConn::checkExceptions(int retry, int timeout)
 int
 UDPConn::computeTimeout(int retry)
 {
+       int32_t rtoMs = 0;
+
+       rtoMs = this->rttvar.rto / 1000;
        if (this->unackQueue.length() == 0)
                return TIMER_CHECKING_PERIOD;
 
        ICBufferLink *bufLink = this->unackQueue.first();
        ICBuffer   *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
 
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE_IC)
+       {
+               if (buf->nRetry == 0 && retry == 0 && 
unack_queue_ring.numSharedOutStanding < (snd_control_info.cwnd - 
snd_control_info.minCwnd))
+                       return 0;
+
+               return rtoMs > TIMER_CHECKING_PERIOD ? rtoMs: 
TIMER_CHECKING_PERIOD;
+       }
+
        if (buf->nRetry == 0 && retry == 0)
                return 0;
 
@@ -4513,8 +5196,7 @@ UDPConn::Send(DataBlock *data)
 
                if (this->entry_->pollAcks(timeout))
                {
-                       bool rs = this->entry_->handleAcks();
-                       if (rs)
+                       if (this->entry_->handleAcks(true))
                        {
                                /*
                                 * We make sure that we deal with the stop 
messages only after
@@ -4527,6 +5209,9 @@ UDPConn::Send(DataBlock *data)
                }
                this->checkExceptions(retry++, timeout);
                doCheckExpiration = false;
+
+               if (!doCheckExpiration && this->unackQueue.length() == 0 && 
this->capacity > 0 && this->sndQueue.length() > 0)
+                       this->sendBuffers();
        }
 
        this->pBuff = (uint8 *) this->curBuff->pkt;
@@ -4670,7 +5355,7 @@ TransportEntry::aggregateStatistics()
  * if we receive a stop message, return true (caller will clean up).
  */
 bool
-TransportEntry::handleAcks()
+TransportEntry::handleAcks(bool need_flush)
 {
        bool            ret = false;
        UDPConn *ackConn = NULL;
@@ -4800,6 +5485,12 @@ TransportEntry::handleAcks()
                                        shouldSendBuffers |= 
(ackConn->handleAckForDisorderPkt(pkt));
                                        break;
                                }
+                               else if (pkt->flags & UDPIC_FLAGS_FULL)
+                               {
+                                       if (IC_DEBUG1 >= 
session_param.log_min_messages)
+                                               LOG(DEBUG1, "Recv buff is full 
[seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", 
pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, 
ackConn->conn_info.seq);
+                                       break;
+                               }
 
                                /*
                                 * don't get out of the loop if pkt->seq equals 
to
@@ -4853,7 +5544,7 @@ TransportEntry::handleAcks()
                                        while 
(!ackConn->unackQueue.is_head(link) && buf->pkt->seq <= pkt->seq)
                                        {
                                                next = link->next;
-                                               ackConn->handleAckedPacket(buf, 
now);
+                                               ackConn->handleAckedPacket(buf, 
now, pkt);
                                                shouldSendBuffers = true;
                                                link = next;
                                                buf = 
GET_ICBUFFER_FROM_PRIMARY(link);
@@ -4869,7 +5560,7 @@ TransportEntry::handleAcks()
                         * still send here, since in STOP/EOS race case, we may 
have been
                         * in EOS sending logic and will not check stop message.
                         */
-                       if (shouldSendBuffers)
+                       if (shouldSendBuffers && need_flush)
                                ackConn->sendBuffers();
                }
                else
@@ -4926,7 +5617,7 @@ TransportEntry::handleStopMsgs()
                {
                        if (this->pollAcks(0))
                        {
-                               bool rs = this->handleAcks();
+                               bool rs = this->handleAcks(true);
                                if (rs)
                                {
                                        /* more stops found, loop again. */
@@ -5050,6 +5741,19 @@ TransportEntry::MakeRecvEntry(CChunkTransportStateImpl 
*state,
                        conn->conn_info.icId            = icid;
                        conn->conn_info.flags           = 
UDPIC_FLAGS_RECEIVER_TO_SENDER;
 
+                       conn->rttvar.ts_rto = 0;
+                       conn->rttvar.rto = UDP_INITIAL_RTO;
+                       conn->rttvar.srtt = 0;
+                       conn->rttvar.rttvar = 0;
+                       conn->rttvar.snd_una = 0;
+                       conn->rttvar.nrtx = 0;
+                       conn->rttvar.max_nrtx = 0;
+                       conn->rttvar.mss = UDP_DEFAULT_MSS;
+                       conn->rttvar.cwnd = 2;
+                       conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH;
+                       conn->rttvar.loss_count = 0;
+                       conn->rttvar.karn_mode = false;
+                       conn->on_rto_idx = -1;
                        ic_control_info.connHtab.add(conn);
 
                        if (global_param.createOpaqueDataCallback)
@@ -5728,6 +6432,8 @@ CChunkTransportStateImpl::CreateSendEntries(ICSliceTable 
*sliceTable)
 
        snd_buffer_pool.init();
        initUnackQueueRing(&unack_queue_ring);
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_TIMER_IC)
+               initUdpManager(&mudp);
        ic_control_info.isSender = true;
        ic_control_info.lastExpirationCheckTime = getCurrentTime();
        ic_control_info.lastPacketSendTime = 
ic_control_info.lastExpirationCheckTime;
@@ -6184,12 +6890,16 @@ CChunkTransportStateImpl::SendEOS(int motNodeID, 
DataBlock *data)
                                        timeout = conn->computeTimeout(retry);
 
                                        if (pEntry->pollAcks(timeout))
-                                               pEntry->handleAcks();
+                                               pEntry->handleAcks(true);
 
                                        conn->checkExceptions(retry++, timeout);
 
                                        if (retry >= MAX_TRY)
+                                       {
+                                               if (conn->unackQueue.length() 
== 0)
+                                                       conn->sendBuffers();
                                                break;
+                                       }
                                }
 
                                if ((!conn->cdbProc) || 
(conn->unackQueue.length() == 0 &&
diff --git a/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp 
b/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp
index baeb6a0841c..2602133a9e5 100644
--- a/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp
+++ b/contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp
@@ -50,9 +50,12 @@
 #include <signal.h>
 #include <stdbool.h>
 #include <stddef.h>
+#include <stdint.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
+#include <sys/param.h>
+#include <sys/queue.h>
 #include <sys/socket.h>
 #include <sys/time.h>
 #include <sys/types.h>
@@ -230,6 +233,9 @@ typedef struct icpkthdr
      */
        uint32          seq;
        uint32          extraSeq;
+       uint64_t send_time;
+       uint64_t recv_time;
+       uint8_t retry_times;
 } icpkthdr;
 
 typedef struct ICBuffer ICBuffer;
@@ -434,6 +440,50 @@ ic_bswap32(uint32 x)
                ((x >> 24) & 0x000000ff);
 }
 
+#define TIMEOUT_Z
+#define RTT_SHIFT_ALPHA (3)       /* srtt (0.125) */
+#define LOSS_THRESH (3) /* Packet loss triggers Karn */
+#define RTO_MIN (5000)   /* MIN RTO(ms) */
+#define RTO_MAX (100000) /* MAX RTO(ms) */
+#define UDP_INFINITE_SSTHRESH   0x7fffffff
+
+#define SEC_TO_USEC(t)                  ((t) * 1000000)
+#define SEC_TO_MSEC(t)                  ((t) * 1000)
+#define MSEC_TO_USEC(t)                 ((t) * 1000)
+#define USEC_TO_SEC(t)                  ((t) / 1000000)
+#define TIME_TICK (1000000/HZ)/* in us */
+
+#define UDP_INITIAL_RTO                 (MSEC_TO_USEC(200))
+#define UDP_DEFAULT_MSS 1460
+
+#define RTO_HASH (3000)
+
+#define UDP_SEQ_LT(a,b) ((int32_t)((a)-(b)) < 0)
+#define UDP_SEQ_LEQ(a,b) ((int32_t)((a)-(b)) <= 0)
+#define UDP_SEQ_GT(a,b) ((int32_t)((a)-(b)) > 0)
+#define UDP_SEQ_GEQ(a,b) ((int32_t)((a)-(b)) >= 0)
+
+#define UDP_RTO_MIN ((unsigned)(HZ/5))
+
+struct UDPConn;
+struct rto_hashstore
+{
+       uint32_t    rto_now_idx;    /* pointing the hs_table_s index */
+       uint32_t    rto_now_ts;
+
+       TAILQ_HEAD(rto_head, UDPConn) rto_list[RTO_HASH + 1];
+};
+
+struct mudp_manager
+{
+       struct rto_hashstore *rto_store;     /* lists related to timeout */
+
+       int         rto_list_cnt;
+       uint32_t    cur_ts;
+};
+
+typedef struct mudp_manager* mudp_manager_t;
+
 #define MAX_TRY (11)
 #define TIMEOUT(try) ((try) < MAX_TRY ? (timeoutArray[(try)]) : 
(timeoutArray[MAX_TRY]))
 
@@ -456,6 +506,7 @@ ic_bswap32(uint32 x)
 #define UDPIC_FLAGS_DISORDER                   (32)
 #define UDPIC_FLAGS_DUPLICATE                  (64)
 #define UDPIC_FLAGS_CAPACITY                   (128)
+#define UDPIC_FLAGS_FULL                (256)
 
 #define UDPIC_MIN_BUF_SIZE (128 * 1024)
 
@@ -465,7 +516,6 @@ ic_bswap32(uint32 x)
  * A connection hash table bin.
  *
  */
-struct UDPConn;
 typedef struct ConnHtabBin ConnHtabBin;
 struct ConnHtabBin
 {
@@ -859,8 +909,10 @@ struct ICGlobalControlInfo
  */
 #define UNACK_QUEUE_RING_SLOTS_NUM (2000)
 #define TIMER_SPAN (session_param.Gp_interconnect_timer_period * 1000ULL)      
/* default: 5ms */
+#define TIMER_SPAN_LOSS (session_param.Gp_interconnect_timer_period * 500ULL)  
   /* default: 5ms */
 #define TIMER_CHECKING_PERIOD 
(session_param.Gp_interconnect_timer_checking_period)    /* default: 20ms */
 #define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN)
+#define UNACK_QUEUE_RING_LENGTH_LOSS (UNACK_QUEUE_RING_SLOTS_NUM * 
TIMER_SPAN_LOSS)
 
 #define DEFAULT_RTT (session_param.Gp_interconnect_default_rtt * 1000) /* 
default: 20ms */
 #define MIN_RTT (100)                  /* 0.1ms */
@@ -916,6 +968,13 @@ struct UnackQueueRing
 
        /* time slots */
        ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM];
+#ifdef TIMEOUT_Z
+       uint32_t retrans_count;
+       uint32_t no_retrans_count;
+       uint32_t time_difference;
+       uint32_t min;
+       uint32_t max;
+#endif
 };
 
 /*
@@ -981,6 +1040,27 @@ typedef struct ICStatistics
 
 struct TransportEntry;
 
+struct udp_send_vars
+{
+       /* send sequence variables */
+       uint32_t snd_una;               /* send unacknoledged */
+       uint32_t snd_wnd;               /* send window (unscaled) */
+
+       /* retransmission timeout variables */
+       uint8_t nrtx;                   /* number of retransmission */
+       uint8_t max_nrtx;               /* max number of retransmission */
+       uint32_t rto;                   /* retransmission timeout */
+       uint32_t ts_rto;                /* timestamp for retransmission timeout 
*/
+
+       /* congestion control variables */
+       uint32_t cwnd;                          /* congestion window */
+       uint32_t ssthresh;                      /* slow start threshold */
+
+       TAILQ_ENTRY(UDPConn) send_link;
+       TAILQ_ENTRY(UDPConn) timer_link;                /* timer link (rto 
list) */
+
+};
+
 /*
  * Structure used for keeping track of a pt-to-pt connection between two
  * Cdb Entities (either QE or QD).
@@ -1035,6 +1115,32 @@ public:
        uint64          stat_max_resent;
        uint64          stat_count_dropped;
 
+       struct {
+                       uint32_t ts_rto;
+                       uint32_t rto;
+                       uint32_t srtt;
+                       uint32_t rttvar;
+                       uint32_t snd_una;
+                       uint16_t nrtx;
+                       uint16_t max_nrtx;
+                       uint32_t mss;
+                       uint32_t cwnd;
+                       uint32_t ssthresh;
+                       uint32_t fss;
+                       uint8_t loss_count;
+                       uint32_t mdev;
+                       uint32_t mdev_max;
+                       uint32_t rtt_seq;               /* sequence number to 
update rttvar */
+                       uint32_t ts_all_rto;
+                       bool karn_mode;
+       } rttvar;
+       
+       uint8_t on_timewait_list;
+       int16_t on_rto_idx;
+
+       uint32_t snd_nxt;               /* send next */
+       struct udp_send_vars sndvar;
+
        TransportEntry *entry_;
 
 public:
@@ -1054,7 +1160,7 @@ public:
        void prepareRxConnForRead();
        void DeactiveConn();
 
-       void handleAckedPacket(ICBuffer *buf, uint64 now);
+       void handleAckedPacket(ICBuffer *buf, uint64 now, struct icpkthdr *pkt);
        void prepareXmit();
        void sendOnce(icpkthdr *pkt);
        void handleStop();
@@ -1072,9 +1178,9 @@ public:
 
        void updateRetransmitStatistics();
        void checkExpirationCapacityFC(int timeout);
+       void checkExpiration(ICChunkTransportState *transportStates, uint64 
now);
 
        static void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool 
*networkTimeoutIsLogged);
-       static void checkExpiration(ICChunkTransportState *transportStates, 
uint64 now);
 
        static void sendAckWithParam(AckSendParam *param);
        static void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr 
*addr, socklen_t peerLen);
@@ -1125,7 +1231,7 @@ public:
 
        void aggregateStatistics();
 
-       bool handleAcks();
+       bool handleAcks(bool need_flush);
        void handleStopMsgs();
 
        bool pollAcks(int timeout);
diff --git a/contrib/udp2/ic_udp2.c b/contrib/udp2/ic_udp2.c
index 31c0d5c2eeb..0972c874941 100644
--- a/contrib/udp2/ic_udp2.c
+++ b/contrib/udp2/ic_udp2.c
@@ -47,6 +47,7 @@
 #include "ic_types.h"
 #include "udp2/ic_udp2.h"
 
+#define MAX_QUEUE_SIZE (64)
 
 #define HandleLastError() \
 do { \
@@ -159,12 +160,50 @@ WaitInterconnectQuitUDPIFC2(void)
 void
 SetupInterconnectUDP2(EState *estate)
 {
+       int32 sliceNum = 0;
+       int32 calcQueueDepth = 0;
+       int32 calcSndDepth = 0;
+
        if (estate->interconnect_context)
                elog(ERROR, "SetupInterconnectUDP: already initialized.");
 
        if (!estate->es_sliceTable)
                elog(ERROR, "SetupInterconnectUDP: no slice table ?");
 
+       if (estate != NULL && estate->es_sliceTable != NULL)
+               sliceNum = estate->es_sliceTable->numSlices;
+       else
+               sliceNum = 1;
+
+       if (Gp_interconnect_mem_size > 0 &&
+               Gp_interconnect_queue_depth == 4 &&
+               Gp_interconnect_snd_queue_depth == 2)
+       {
+               int32 perQueue = Gp_interconnect_mem_size /
+                       (Gp_max_packet_size * sliceNum);
+
+               calcSndDepth = Max(Gp_interconnect_snd_queue_depth, perQueue / 
2);
+               calcQueueDepth = Max(Gp_interconnect_queue_depth, perQueue - 
calcSndDepth);
+
+               if (calcSndDepth > MAX_QUEUE_SIZE)
+                       calcSndDepth = MAX_QUEUE_SIZE;
+
+               if (calcQueueDepth > MAX_QUEUE_SIZE)
+                       calcQueueDepth = MAX_QUEUE_SIZE;
+
+               Gp_interconnect_snd_queue_depth = calcSndDepth;
+               Gp_interconnect_queue_depth = calcQueueDepth;
+
+               elog(DEBUG1, "SetupUDPIFCInterconnect: queue depth, "
+                                    "queue_depth=%d, snd_queue_depth=%d, "
+                                    "mem_size=%d, slices=%d, packet_size=%d",
+                                    Gp_interconnect_queue_depth,
+                                    Gp_interconnect_snd_queue_depth,
+                                    Gp_interconnect_mem_size,
+                                    sliceNum,
+                                    Gp_max_packet_size);
+       }
+
        SessionMotionLayerIPCParam param;
        SetupSessionMotionLayerIPCParam(&param);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to