zhangyue1818 commented on code in PR #1357: URL: https://github.com/apache/cloudberry/pull/1357#discussion_r2432403021
########## contrib/udp2/ic_modules.c: ########## @@ -0,0 +1,87 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * ic_modules.c + * + * IDENTIFICATION + * contrib/udp2/ic_modules.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "cdb/ml_ipc.h" +#include "ic_modules.h" +#include "ic_udp2.h" + +PG_MODULE_MAGIC; + +MotionIPCLayer udp2_ipc_layer = { Review Comment: fix in [94428f1](https://github.com/apache/cloudberry/pull/1357/commits/94428f12b58a72c537d6c7bd7c79ea8fa42f2877) ########## contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp: ########## @@ -0,0 +1,1214 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * ic_udp2_internal.hpp + * + * IDENTIFICATION + * contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp + * + *------------------------------------------------------------------------- + */ +#ifndef IC_UDP2_INTERNAL_HPP +#define IC_UDP2_INTERNAL_HPP + +#include <chrono> +#include <condition_variable> +#include <mutex> +#include <sstream> +#include <stdexcept> +#include <thread> +#include <atomic> +#include <cstdarg> +#include <vector> +#include <map> + +#include <cassert> +#include <cstring> +#include <fcntl.h> +#include <assert.h> +#include <errno.h> +#include <limits.h> +#include <netdb.h> +#include <poll.h> +#include <pthread.h> +#include <signal.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> + +#include "ic_udp2.hpp" +#include "ic_utility.hpp" + +namespace { + +typedef enum MotionConnState +{ + mcsNull, + mcsAccepted, + mcsSetupOutgoingConnection, + mcsConnecting, + mcsRecvRegMsg, + mcsSendRegMsg, + mcsStarted, + mcsEosSent +} MotionConnState; + +/* + * Structure used for keeping track of a pt-to-pt connection between two + * Cdb Entities (either QE or QD). + */ +typedef struct MotionConn +{ + /* socket file descriptor. */ + int sockfd; + + /* pointer to the data buffer. */ + uint8 *pBuff; + + /* size of the message in the buffer, if any. */ + int32 msgSize; + + /* position of message inside of buffer, "cursor" pointer */ + uint8 *msgPos; + + /* + * recv bytes: we can have more than one message/message fragment in recv + * queue at once + */ + int32 recvBytes; + + int tupleCount; + + /* + * false means 1) received a stop message and has handled it. 2) received + * EOS message or sent out EOS message 3) received a QueryFinishPending + * notify and has handled it. + */ + bool stillActive; + + /* + * used both by motion sender and motion receiver + * + * sender: true means receiver don't need to consume tuples any more, + * sender is also responsible to send stop message to its senders. + * + * receiver: true means have sent out a stop message to its senders. The + * stop message might be lost, stopRequested can also tell sender that no + * more data needed in the ack message. + */ + bool stopRequested; + + MotionConnState state; + + ICCdbProcess *cdbProc; + int remoteContentId; + char remoteHostAndPort[128]; /* Numeric IP addresses should never + * be longer than about 50 chars, but + * play it safe */ + + void *opaque_data; + + /* + * used by the sender. + * + * the typmod of last sent record type in current connection, + * if the connection is for broadcasting then we only check + * and update this attribute on connection 0. + */ + int32 sent_record_typmod; + +} MotionConn; + +/* + * Used to organize all of the information for a given motion node. + */ +typedef struct ChunkTransportStateEntry +{ + int motNodeId; + bool valid; + + /* Connection array + * + * MUST pay attention: use getMotionConn to get MotionConn. + * must not use `->conns[index]` to get MotionConn. Because the struct + * MotionConn is a base structure for MotionConnTCP and + * MotionConnUDP. After connection setup, the `conns` will be fill + * with MotionConnUDP/MotionConnTCP, but the pointer still is + * MotionConn which should use `CONTAINER_OF` to get the real object. + */ + MotionConn *conns; + int numConns; + + int scanStart; + + /* slice table entries */ + struct ICExecSlice *sendSlice; + struct ICExecSlice *recvSlice; + +} ChunkTransportStateEntry; + +typedef struct icpkthdr +{ + int32 motNodeId; + + /* + * three pairs which seem useful for identifying packets. + * + * MPP-4194: It turns out that these can cause collisions; but the high + * bit (1<<31) of the dstListener port is now used for disambiguation with + * mirrors. + */ + int32 srcPid; + int32 srcListenerPort; + + int32 dstPid; + int32 dstListenerPort; + + int32 sessionId; + int32 icId; + + int32 recvSliceIndex; + int32 sendSliceIndex; + int32 srcContentId; + int32 dstContentId; + + /* MPP-6042: add CRC field */ + uint32 crc; + + /* packet specific info */ + int32 flags; + uint32 len; + + /* + * The usage of seq and extraSeq field + * a) In a normal DATA packet + * seq -> the data packet sequence number + * extraSeq -> not used + * b) In a normal ACK message (UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY) + * seq -> the largest seq of the continuously cached packets + * sometimes, it is special, for exampke, conn req ack, mismatch ack. + * extraSeq -> the largest seq of the consumed packets + * c) In a start race NAK message (UPDIC_FLAGS_NAK) + * seq -> the seq from the pkt + * extraSeq -> the extraSeq from the pkt + * d) In a DISORDER message (UDPIC_FLAGS_DISORDER) + * seq -> packet sequence number that triggers the disorder message + * extraSeq -> the largest seq of the received packets + * e) In a DUPLICATE message (UDPIC_FLAGS_DUPLICATE) + * seq -> packet sequence number that triggers the duplicate message + * extraSeq -> the largest seq of the continuously cached packets + * f) In a stop messege (UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY) + * seq -> the largest seq of the continuously cached packets + * extraSeq -> the largest seq of the continuously cached packets + * + * + * NOTE that: EOS/STOP flags are often saved in conn_info structure of a connection. + * It is possible for them to be sent together with other flags. + * + */ + uint32 seq; + uint32 extraSeq; +} icpkthdr; + +typedef struct ICBuffer ICBuffer; +typedef struct ICBufferLink ICBufferLink; + +typedef enum ICBufferListType +{ + ICBufferListType_Primary, + ICBufferListType_Secondary, + ICBufferListType_UNDEFINED +} ICBufferListType; + +struct ICBufferLink +{ + ICBufferLink *next; + ICBufferLink *prev; +}; + +/* + * ICBufferList + * ic buffer list data structure. + * + * There are two kinds of lists. The first kind of list uses the primary next/prev pointers. + * And the second kind uses the secondary next/prev pointers. + */ +struct ICBufferList +{ + int len; + ICBufferListType type; /* primary or secondary */ + + ICBufferLink head; + +#if defined(USE_ASSERT_CHECKING) || defined(AMS_VERBOSE_LOGGING) + void icBufferListLog(); +#endif + +#ifdef USE_ASSERT_CHECKING + void icBufferListCheck(const char *prefix); +#endif + + void init(ICBufferListType type); + void destroy(); + + bool is_head(ICBufferLink *link); + int length(); + ICBufferLink* first(); + + ICBuffer* append(ICBuffer *buf); + ICBuffer* remove(ICBuffer *buf); + ICBuffer* pop(); + + void release(bool inExpirationQueue); + + void dump_to_file(FILE *ofile); +}; + +#define GET_ICBUFFER_FROM_PRIMARY(ptr) CONTAINER_OF(ptr, ICBuffer, primary) +#define GET_ICBUFFER_FROM_SECONDARY(ptr) CONTAINER_OF(ptr, ICBuffer, secondary) + +/* + * ICBuffer + * interconnect buffer data structure. + * + * In some cases, an ICBuffer may exists in two lists/queues, + * thus it has two sets of pointers. For example, an ICBuffer + * can exist in an unack queue and an expiration queue at the same time. + * + * It is important to get the ICBuffer address when we iterate a list of + * ICBuffers through primary/secondary links. The Macro GET_ICBUFFER_FROM_PRIMARY + * and GET_ICBUFFER_FROM_SECONDARY are for this purpose. + * + */ +struct ICBuffer +{ + /* primary next and prev pointers */ + ICBufferLink primary; + + /* secondary next and prev pointers */ + ICBufferLink secondary; + + /* connection that this buffer belongs to */ + MotionConn *conn; + + /* + * Three fields for expiration processing + * + * sentTime - the time this buffer was sent nRetry - the number of send + * retries unackQueueRingSlot - unack queue ring slot index + */ + uint64 sentTime; + int32 nRetry; + int32 unackQueueRingSlot; + + /* real data */ + icpkthdr pkt[0]; +}; + +static inline void* +ic_malloc(size_t size) +{ + return malloc(size); +} + +static inline void* +ic_malloc0(size_t size) +{ + void *rs = ic_malloc(size); + if (rs) + memset(rs, 0, size); + return rs; +} + +static inline void +ic_free(void *p) +{ + free(p); +} + +static inline void +ic_free_clean(void **p) +{ + ic_free(*p); + *p = NULL; +} + +static inline void +ic_usleep(long microsec) +{ + if (microsec > 0) + { + struct timeval delay; + + delay.tv_sec = microsec / 1000000L; + delay.tv_usec = microsec % 1000000L; + (void) select(0, NULL, NULL, NULL, &delay); + } +} + +/* + * Put socket into nonblock mode. + * Returns true on success, false on failure. + */ +static inline bool +ic_set_noblock(int sock) +{ + int flags; + + flags = fcntl(sock, F_GETFL); + if (flags < 0) + return false; + if (fcntl(sock, F_SETFL, (flags | O_NONBLOCK)) == -1) + return false; + return true; +} + +/* ic_atomic_xxx */ +typedef struct ic_atomic_uint32 +{ + volatile uint32 value; +} ic_atomic_uint32; + +static inline void +ic_atomic_init_u32(volatile ic_atomic_uint32 *ptr, uint32 val) +{ + ptr->value = val; +} + +static inline uint32 +ic_atomic_read_u32(volatile ic_atomic_uint32 *ptr) +{ + return ptr->value; +} + +static inline void +ic_atomic_write_u32(volatile ic_atomic_uint32 *ptr, uint32 val) +{ + ptr->value = val; +} + +static inline bool +ic_atomic_compare_exchange_u32(volatile ic_atomic_uint32 *ptr, + uint32 *expected, uint32 newval) +{ + /* FIXME: we can probably use a lower consistency model */ + return __atomic_compare_exchange_n(&ptr->value, expected, newval, false, + __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); +} + +static inline uint32 +ic_atomic_add_fetch_u32(volatile ic_atomic_uint32 *ptr, int32 add_) +{ + return __sync_fetch_and_add(&ptr->value, add_) + add_; +} + +static inline uint32 +ic_bswap32(uint32 x) +{ + return + ((x << 24) & 0xff000000) | + ((x << 8) & 0x00ff0000) | + ((x >> 8) & 0x0000ff00) | + ((x >> 24) & 0x000000ff); +} + +#define MAX_TRY (11) +#define TIMEOUT(try) ((try) < MAX_TRY ? (timeoutArray[(try)]) : (timeoutArray[MAX_TRY])) + +#define USECS_PER_SECOND 1000000 +#define MSECS_PER_SECOND 1000 + +/* 1/4 sec in msec */ +#define RX_THREAD_POLL_TIMEOUT (250) + +/* + * Flags definitions for flag-field of UDP-messages + * + * We use bit operations to test these, flags are powers of two only + */ +#define UDPIC_FLAGS_RECEIVER_TO_SENDER (1) +#define UDPIC_FLAGS_ACK (2) +#define UDPIC_FLAGS_STOP (4) +#define UDPIC_FLAGS_EOS (8) +#define UDPIC_FLAGS_NAK (16) +#define UDPIC_FLAGS_DISORDER (32) +#define UDPIC_FLAGS_DUPLICATE (64) +#define UDPIC_FLAGS_CAPACITY (128) + +#define UDPIC_MIN_BUF_SIZE (128 * 1024) + +/* + * ConnHtabBin + * + * A connection hash table bin. + * + */ +struct UDPConn; +typedef struct ConnHtabBin ConnHtabBin; +struct ConnHtabBin +{ + UDPConn *conn; + struct ConnHtabBin *next; +}; + +/* + * ConnHashTable + * + * Connection hash table definition. + * + */ +typedef struct ConnHashTable ConnHashTable; +struct ConnHashTable +{ + ConnHtabBin **table; + int size; + + bool init(); + bool add(UDPConn *conn); + UDPConn *find(icpkthdr *hdr); + void destroy(); + void remove(UDPConn *conn); +}; + +#define CONN_HASH_VALUE(icpkt) ((uint32)((((icpkt)->srcPid ^ (icpkt)->dstPid)) + (icpkt)->dstContentId)) +#define CONN_HASH_MATCH(a, b) (((a)->motNodeId == (b)->motNodeId && \ + (a)->dstContentId == (b)->dstContentId && \ + (a)->srcContentId == (b)->srcContentId && \ + (a)->recvSliceIndex == (b)->recvSliceIndex && \ + (a)->sendSliceIndex == (b)->sendSliceIndex && \ + (a)->srcPid == (b)->srcPid && \ + (a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId)) + +/* + * CursorICHistoryEntry + * + * The definition of cursor IC history entry. + */ +typedef struct CursorICHistoryEntry CursorICHistoryEntry; +struct CursorICHistoryEntry +{ + /* Interconnect instance id. */ + uint32 icId; + + /* Command id. */ + uint32 cid; + + /* + * Interconnect instance status. state 1 (value 1): interconnect is setup + * state 0 (value 0): interconnect was torn down. + */ + uint8 status; + + /* Next entry. */ + CursorICHistoryEntry *next; + + CursorICHistoryEntry(uint32 aicId, uint32 acid): + icId(aicId), cid(acid),status(1){} +}; + +/* + * CursorICHistoryTable + * + * Cursor IC history table. It is a small hash table. + */ +typedef struct CursorICHistoryTable CursorICHistoryTable; +struct CursorICHistoryTable +{ + uint32 size; + uint32 count; + CursorICHistoryEntry **table; + + void init() { + count = 0; + size = session_param.Gp_interconnect_cursor_ic_table_size; + table = (CursorICHistoryEntry **)ic_malloc0(sizeof(CursorICHistoryEntry *) * size); + } + + void add(uint32 icId, uint32 cid) { + uint32 index = icId % size; + CursorICHistoryEntry *p = new CursorICHistoryEntry(icId, cid); + + p->next = this->table[index]; + this->table[index] = p; + this->count++; + + LOG(DEBUG2, "add icid %d cid %d status %d", p->icId, p->cid, p->status); + + return; + } + + /* + * state 1 (value 1): interconnect is setup + * state 0 (value 0): interconnect was torn down. + */ + void update(uint32 icId, uint8 status) { + for (CursorICHistoryEntry *p = table[icId % size]; p; p = p->next) { + if (p->icId == icId) { + p->status = status; + return; + } + } + } + + CursorICHistoryEntry* get(uint32 icId) { + for (CursorICHistoryEntry *p = table[icId % size]; p; p = p->next) { + if (p->icId == icId) + return p; + } + return NULL; + } + + void purge() { + for (uint8 index = 0; index < size; index++) { + while (table[index]) { + CursorICHistoryEntry *trash = table[index]; + table[index] = trash->next; + ic_free(trash); + } + } + } + + void prune(uint32 icId); +}; + +/* + * Synchronization timeout values + * + * MAIN_THREAD_COND_TIMEOUT - 1/4 second + */ +#define MAIN_THREAD_COND_TIMEOUT_MS (250) + +/* + * Used for synchronization between main thread (receiver) and background thread. + * + */ +typedef struct ThreadWaitingState ThreadWaitingState; +struct ThreadWaitingState +{ + bool waiting; + int waitingNode; + int waitingRoute; + int reachRoute; + + /* main_thread_waiting_query is needed to disambiguate for cursors */ + int waitingQuery; + + void reset() { + waiting = false; + waitingNode = -1; + waitingRoute = ANY_ROUTE; + reachRoute = ANY_ROUTE; + waitingQuery = -1; + } + + void set(int motNodeId, int route, int icId) { + waiting = true; + waitingNode = motNodeId; + waitingRoute = route; + reachRoute = ANY_ROUTE; + waitingQuery = icId; + } +}; + +/* + * ReceiveControlInfo + * + * The related control information for receiving data packets. + * Main thread (Receiver) and background thread use the information in + * this data structure to handle data packets. + * + */ +typedef struct ReceiveControlInfo ReceiveControlInfo; +struct ReceiveControlInfo +{ + /* Main thread waiting state. */ + ThreadWaitingState mainWaitingState; + + /* + * Buffers used to assemble disorder messages at receiver side. + */ + icpkthdr *disorderBuffer; + + /* The last interconnect instance id which is torn down. */ + int32 lastTornIcId; + + /* Cursor history table. */ + CursorICHistoryTable cursorHistoryTable; + + /* + * Last distributed transaction id when SetupUDPInterconnect is called. + * Coupled with cursorHistoryTable, it is used to handle multiple + * concurrent cursor cases. + */ + DistributedTransactionId lastDXatId; +}; + +/* + * RxBufferPool + * + * Receive thread buffer pool definition. The implementation of + * receive side buffer pool is different from send side buffer pool. + * It is because receive side buffer pool needs a ring buffer to + * easily implement disorder message handling logic. + */ + +typedef struct RxBufferPool RxBufferPool; +struct RxBufferPool +{ + /* The max number of buffers we can get from this pool. */ + int maxCount; + + /* The number of allocated buffers */ + int count; + + /* The list of free buffers. */ + char *freeList; + + void put(icpkthdr *buf); + void release(icpkthdr *buf); + icpkthdr* get(); + icpkthdr* get_free(); +}; + +/* + * SendBufferPool + * + * The send side buffer pool definition. + * + */ +typedef struct SendBufferPool SendBufferPool; +struct SendBufferPool +{ + /* The maximal number of buffers sender can use. */ + int maxCount; + + /* The number of buffers sender already used. */ + int count; + + /* The free buffer list at the sender side. */ + ICBufferList freeList; + + void init(); + void clean(); + ICBuffer* get(UDPConn *conn); +}; + +/* + * SendControlInfo + * + * The related control information for sending data packets and handling acks. + * Main thread use the information in this data structure to do ack handling + * and congestion control. + * + */ +typedef struct SendControlInfo SendControlInfo; +struct SendControlInfo +{ + /* The buffer used for accepting acks */ + icpkthdr *ackBuffer; + + /* congestion window */ + float cwnd; + + /* minimal congestion control window */ + float minCwnd; + + /* slow start threshold */ + float ssthresh; + +}; + +/* + * ICGlobalControlInfo + * + * Some shared control information that is used by main thread (senders, receivers, or both) + * and the background thread. + * + */ +typedef struct ICGlobalControlInfo ICGlobalControlInfo; +struct ICGlobalControlInfo +{ + /* The background thread handle. */ + pthread_t threadHandle; + + /* Keep the udp socket buffer size used. */ + uint32 socketSendBufferSize; + uint32 socketRecvBufferSize; + + uint64 lastExpirationCheckTime; + uint64 lastDeadlockCheckTime; + + /* Used to decide whether to retransmit for capacity based FC. */ + uint64 lastPacketSendTime; + + /* + * Lock and latch for coordination between main thread and + * background thread. It protects the shared data between the two threads + * (the connHtab, rx buffer pool and the mainWaitingState etc.). + */ + pthread_mutex_t lock; + + /* Am I a sender? */ + bool isSender; + + /* Flag showing whether the thread is created. */ + bool threadCreated; + + /* Error number. Actually int but we do not have ic_atomic_int32. */ + ic_atomic_uint32 eno; + + /* + * Global connection htab for both sending connections and receiving + * connections. Protected by the lock in this data structure. + */ + ConnHashTable connHtab; + + /* The connection htab used to cache future packets. */ + ConnHashTable startupCacheHtab; + + /* Used by main thread to ask the background thread to exit. */ + ic_atomic_uint32 shutdown; + + /*Serialization + * Used by ic thread in the QE to identify the current serving ic instance + * and handle the mismatch packets. It is not used by QD because QD may have + * cursors, QD may receive packets for open the cursors with lower instance + * id, QD use cursorHistoryTable to handle packets mismatch. + */ + int32 ic_instance_id; +}; + +/* + * Macro for unack queue ring, round trip time (RTT) and expiration period (RTO) + * + * UNACK_QUEUE_RING_SLOTS_NUM - the number of slots in the unack queue ring. + * this value should be greater than or equal to 2. + * TIMER_SPAN - timer period in us + * TIMER_CHECKING_PERIOD - timer checking period in us + * UNACK_QUEUE_RING_LENGTH - the whole time span of the unack queue ring + * DEFAULT_RTT - default rtt in us. + * MIN_RTT - min rtt in us + * MAX_RTT - max rtt in us + * RTT_SHIFT_COEFFICIENT - coefficient for RTT computation + * + * DEFAULT_DEV - default round trip standard deviation + * MAX_DEV - max dev + * DEV_SHIFT_COEFFICIENT - coefficient for DEV computation + * + * MAX_EXPIRATION_PERIOD - max expiration period in us + * MIN_EXPIRATION_PERIOD - min expiration period in us + * MAX_TIME_NO_TIMER_CHECKING - max time without checking timer + * DEADLOCK_CHECKING_TIME - deadlock checking time + * + * MAX_SEQS_IN_DISORDER_ACK - max number of sequences that can be transmitted in a + * disordered packet ack. + * + * + * Considerations on the settings of the values: + * + * TIMER_SPAN and UNACK_QUEUE_RING_SLOTS_NUM define the ring period. + * Currently, it is UNACK_QUEUE_RING_LENGTH (default 10 seconds). + * + * The definition of UNACK_QUEUE_RING_LENGTH is quite related to the size of + * sender side buffer and the size we may resend in a burst for an expiration event + * (which may overwhelm switch or OS if it is too large). + * Thus, we do not want to send too much data in a single expiration event. Here, a + * relatively large UNACK_QUEUE_RING_SLOTS_NUM value is used to avoid that. + * + * If the sender side buffer is X (MB), then on each slot, + * there are about X/UNACK_QUEUE_RING_SLOTS_NUM. Even we have a very large sender buffer, + * for example, 100MB, there is about 96M/2000 = 50K per slot. + * This is fine for the OS (with buffer 2M for each socket generally) and switch. + * + * Note that even when the buffers are not evenly distributed in the ring and there are some packet + * losses, the congestion control mechanism, the disorder and duplicate packet handling logic will + * assure the number of outstanding buffers (in unack queues) to be not very large. + * + * MIN_RTT/MAX_RTT/DEFAULT_RTT/MIN_EXPIRATION_PERIOD/MAX_EXPIRATION_PERIOD gives some heuristic values about + * the computation of RTT and expiration period. RTT and expiration period (RTO) are not + * constant for various kinds of hardware and workloads. Thus, they are computed dynamically. + * But we also want to bound the values of RTT and MAX_EXPIRATION_PERIOD. It is + * because there are some faults that may make RTT a very abnormal value. Thus, RTT and + * expiration period are upper and lower bounded. + * + * MAX_SEQS_IN_DISORDER_ACK should be smaller than (MIN_PACKET_SIZE - sizeof(icpkthdr))/sizeof(uint32). + * It is due to the limitation of the ack receive buffer size. + * + */ +#define UNACK_QUEUE_RING_SLOTS_NUM (2000) +#define TIMER_SPAN (session_param.Gp_interconnect_timer_period * 1000ULL) /* default: 5ms */ +#define TIMER_CHECKING_PERIOD (session_param.Gp_interconnect_timer_checking_period) /* default: 20ms */ +#define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN) + +#define DEFAULT_RTT (session_param.Gp_interconnect_default_rtt * 1000) /* default: 20ms */ +#define MIN_RTT (100) /* 0.1ms */ +#define MAX_RTT (200 * 1000) /* 200ms */ +#define RTT_SHIFT_COEFFICIENT (3) /* RTT_COEFFICIENT 1/8 (0.125) */ + +#define DEFAULT_DEV (0) +#define MIN_DEV MIN_RTT +#define MAX_DEV MAX_RTT +#define DEV_SHIFT_COEFFICIENT (2) /* DEV_COEFFICIENT 1/4 (0.25) */ + +#define MAX_EXPIRATION_PERIOD (1000 * 1000) /* 1s */ +#define MIN_EXPIRATION_PERIOD (session_param.Gp_interconnect_min_rto * 1000) /* default: 20ms */ + +#define MAX_TIME_NO_TIMER_CHECKING (50 * 1000) /* 50ms */ +#define DEADLOCK_CHECKING_TIME (512 * 1000) /* 512ms */ + +#define MAX_SEQS_IN_DISORDER_ACK (4) + +/* + * UnackQueueRing + * + * An unacked queue ring is used to decide which packet is expired in constant time. + * + * Each slot of the ring represents a fixed time span, for example 1ms, and + * each slot has a associated buffer list/queue which contains the packets + * which will expire in the time span. + * + * If the current time pointer (time t) points to slot 1, + * then slot 2 represents the time span from t + 1ms to t + 2ms. + * When we check whether there are some packets expired, we start from the last + * current time recorded, and resend all the packets in the queue + * until we reach the slot that the updated current time points to. + * + */ +typedef struct UnackQueueRing UnackQueueRing; +struct UnackQueueRing +{ + /* save the current time when we check the time wheel for expiration */ + uint64 currentTime; + + /* the slot index corresponding to current time */ + int idx; + + /* the number of outstanding packets in unack queue ring */ + int numOutStanding; + + /* + * the number of outstanding packets that use the shared bandwidth in the + * congestion window. + */ + int numSharedOutStanding; + + /* time slots */ + ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM]; +}; + +/* + * AckSendParam + * + * The parameters for ack sending. + */ +typedef struct AckSendParam +{ + /* header for the ack */ + icpkthdr msg; + + /* peer address for the ack */ + struct sockaddr_storage peer; + socklen_t peer_len; +} AckSendParam; + +/* + * ICStatistics + * + * A structure keeping various statistics about interconnect internal. + * + * Note that the statistics for ic are not accurate for multiple cursor case on QD. + * + * totalRecvQueueSize - receive queue size sum when main thread is trying to get a packet. + * recvQueueSizeCountingTime - counting times when computing totalRecvQueueSize. + * totalCapacity - the capacity sum when packets are tried to be sent. + * capacityCountingTime - counting times used to compute totalCapacity. + * totalBuffers - total buffers available when sending packets. + * bufferCountingTime - counting times when compute totalBuffers. + * activeConnectionsNum - the number of active connections. + * retransmits - the number of packet retransmits. + * mismatchNum - the number of mismatched packets received. + * crcErrors - the number of crc errors. + * sndPktNum - the number of packets sent by sender. + * recvPktNum - the number of packets received by receiver. + * disorderedPktNum - disordered packet number. + * duplicatedPktNum - duplicate packet number. + * recvAckNum - the number of Acks received. + * statusQueryMsgNum - the number of status query messages sent. + * + */ +typedef struct ICStatistics +{ + uint64 totalRecvQueueSize; + uint64 recvQueueSizeCountingTime; + uint64 totalCapacity; + uint64 capacityCountingTime; + uint64 totalBuffers; + uint64 bufferCountingTime; + uint32 activeConnectionsNum; + int32 retransmits; + int32 startupCachedPktNum; + int32 mismatchNum; + int32 crcErrors; + int32 sndPktNum; + int32 recvPktNum; + int32 disorderedPktNum; + int32 duplicatedPktNum; + int32 recvAckNum; + int32 statusQueryMsgNum; +} ICStatistics; + +struct TransportEntry; + +/* + * Structure used for keeping track of a pt-to-pt connection between two + * Cdb Entities (either QE or QD). + */ +struct UDPConn : public MotionConn +{ +public: + /* send side queue for packets to be sent */ + ICBufferList sndQueue; + int capacity; + + /* seq already sent */ + uint32 sentSeq; + + /* ack of this seq and packets with smaller seqs have been received */ + uint32 receivedAckSeq; + + /* packets with this seq or smaller seqs have been consumed */ + uint32 consumedSeq; + + uint64 rtt; + uint64 dev; + uint64 deadlockCheckBeginTime; + + ICBuffer *curBuff; + + /* + * send side unacked packet queue. Since it is often accessed at the same + * time with unack queue ring, it is protected with unqck queue ring lock. + */ + ICBufferList unackQueue; + + uint16 route; + + struct icpkthdr conn_info; + + struct sockaddr_storage peer; /* Allow for IPv4 or IPv6 */ + socklen_t peer_len; /* And remember the actual length */ + + /* a queue of maximum length Gp_interconnect_queue_depth */ + uint32 pkt_q_capacity; /* max capacity of the queue */ + uint32 pkt_q_size; /* number of packets in the queue */ + int pkt_q_head; + int pkt_q_tail; + uint8 **pkt_q; + + uint64 stat_total_ack_time; + uint64 stat_count_acks; + uint64 stat_max_ack_time; + uint64 stat_min_ack_time; + uint64 stat_count_resent; + uint64 stat_max_resent; + uint64 stat_count_dropped; + + TransportEntry *entry_; + +public: + UDPConn(TransportEntry *entry); + + void GetDataInBuf(GetDataLenInPacket getLen, DataBlock *data); + void ReleaseBuffer(AckSendParam *param); + + void setAckParam(AckSendParam *param, int32 flags, uint32 seq, uint32 extraSeq); + void sendAck(int32 flags, uint32 seq, uint32 extraSeq); + void sendDisorderAck(uint32 seq, uint32 extraSeq, uint32 lostPktCnt); + void sendStatusQueryMessage(uint32 seq); + + uint64 computeExpirationPeriod(uint32 retry); + + void freeDisorderedPackets(); + void prepareRxConnForRead(); + void DeactiveConn(); + + void handleAckedPacket(ICBuffer *buf, uint64 now); + void prepareXmit(); + void sendOnce(icpkthdr *pkt); + void handleStop(); + void sendBuffers(); + + void handleDisorderPacket(int pos, uint32 tailSeq, icpkthdr *pkt); + bool handleAckForDisorderPkt(icpkthdr *pkt); + bool handleAckForDuplicatePkt(icpkthdr *pkt); + int computeTimeout(int retry); + + void Send(DataBlock *data); + + void checkDeadlock(); + void checkExceptions(int retry, int timeout); + + void updateRetransmitStatistics(); + void checkExpirationCapacityFC(int timeout); + + static void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged); + static void checkExpiration(ICChunkTransportState *transportStates, uint64 now); + + static void sendAckWithParam(AckSendParam *param); + static void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen); +}; + + +/* + * Used to organize all of the information for a given motion node. + */ +struct CChunkTransportStateEntry +{ +public: + int motNodeId; + bool valid; + + /* Connection array + * + * MUST pay attention: use getMotionConn to get MotionConn. + * must not use `->conns[index]` to get MotionConn. Because the struct + * MotionConn is a base structure for MotionConnTCP and + * MotionConnUDP. After connection setup, the `conns` will be fill + * with MotionConnUDP/MotionConnTCP, but the pointer still is + * MotionConn which should use `CONTAINER_OF` to get the real object. + */ + MotionConn *conns; + int numConns; + + int scanStart; + + /* slice table entries */ + struct ICExecSlice *sendSlice; + struct ICExecSlice *recvSlice; +}; + +class CChunkTransportStateImpl; + +struct TransportEntry : public CChunkTransportStateEntry Review Comment: fix in [94428f1](https://github.com/apache/cloudberry/pull/1357/commits/94428f12b58a72c537d6c7bd7c79ea8fa42f2877) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
