yjhjstz commented on code in PR #1357: URL: https://github.com/apache/cloudberry/pull/1357#discussion_r2429921805
########## contrib/udp2/ic_common/udp2/ic_udp2.cpp: ########## @@ -0,0 +1,6958 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * ic_udp2.cpp + * + * IDENTIFICATION + * contrib/udp2/ic_common/udp2/ic_udp2.cpp + * + * +--------------+ + * | ic_types.h | + * +--------------+ + * / \ + * +--------------+ +---------------+ + * | C interface | | C++ interface | + * | ic_udp2.h | | ic_udp2.hpp | + * +--------------+ +---------------+ + * \ / + * +----------------------+ + * | C++ implement | + * | ic_udp2_internal.hpp| + * | ic_faultinjection.h | + * | ic_udp2.cpp | + * +----------------------+ + *------------------------------------------------------------------------- + */ +#include <chrono> +#include <condition_variable> +#include <mutex> +#include <sstream> +#include <stdexcept> +#include <thread> +#include <vector> +#include <atomic> +#include <cstdarg> + +/* + * interface header files + */ +#ifdef __cplusplus +extern "C" { +#endif + +#include "ic_types.h" +#include "ic_udp2.h" + +#ifdef __cplusplus +} +#endif + +#include "ic_udp2.hpp" + +/* + * internal header files + */ +#include "ic_utility.hpp" +#include "ic_udp2_internal.hpp" +#include "ic_faultinjection.h" + +static int timeoutArray[] = +{ + 1, + 1, + 2, + 4, + 8, + 16, + 32, + 64, + 128, + 256, + 512, + 512 /* MAX_TRY */ +}; + +/* + * Main thread (Receiver) and background thread use the information in + * this data structure to handle data packets. + */ +static ReceiveControlInfo rx_control_info; + +/* + * The buffer pool used for keeping data packets. + * + * maxCount is set to 1 to make sure there is always a buffer + * for picking packets from OS buffer. + */ +static RxBufferPool rx_buffer_pool = {1, 0, NULL}; + +/* + * The sender side buffer pool. + */ +static SendBufferPool snd_buffer_pool; + +/* + * Main thread use the information in this data structure to do ack handling + * and congestion control. + */ +static SendControlInfo snd_control_info; + +/* + * Shared control information that is used by senders, receivers and background thread. + */ +static ICGlobalControlInfo ic_control_info; + +/* + * All connections in a process share this unack queue ring instance. + */ +static UnackQueueRing unack_queue_ring = {0, 0, 0}; + +static int ICSenderSocket = -1; +static int32 ICSenderPort = 0; +static int ICSenderFamily = 0; + +/* Statistics for UDP interconnect. */ +static ICStatistics ic_statistics; + +/* Cached sockaddr of the listening udp socket */ +static struct sockaddr_storage udp_dummy_packet_sockaddr; + +/* UDP listen fd */ +static int UDP_listenerFd = -1; + +/* UDP listen port */ +static int32 udp_listener_port = 0; + +static std::mutex mtx; +static std::condition_variable cv; + +CChunkTransportState *CChunkTransportStateImpl::state_ = nullptr; + +/* + * Identity the user of ic module by vector_engine_is_user: + * "false" means PG executor, "true" means Arrow executor. + */ +static thread_local bool vector_engine_is_user = false; +static thread_local bool thread_quit = false; + +#define CHECK_QUIT_FLAG() \ + do { \ + if (thread_quit) { \ + throw ICException("received thread quit flag.", __FILE__, __LINE__); \ + } \ + } while(0) + +#define CHECK_INTERRUPTS(state) \ + do { \ + if (vector_engine_is_user) { \ + CHECK_QUIT_FLAG(); \ + } else if (global_param.checkInterruptsCallback) { \ + global_param.checkInterruptsCallback((state)->teardownActive); \ + } \ + } while(0) + +#define CHECK_CANCEL(state) \ + do { \ + if (vector_engine_is_user) { \ + CHECK_QUIT_FLAG(); \ + } else if (global_param.checkCancelOnQDCallback) { \ + global_param.checkCancelOnQDCallback(state); \ + } \ + } while(0) + +#define CHECK_POSTMASTER_ALIVE() \ + do { \ + if (vector_engine_is_user) { \ + CHECK_QUIT_FLAG(); \ + } else if (global_param.checkPostmasterIsAliveCallback && !global_param.checkPostmasterIsAliveCallback()) { \ + throw ICFatalException("FATAL, interconnect failed to send chunks, Postmaster is not alive.", __FILE__, __LINE__); \ + } \ + } while(0) + +/*========================================================================= + * STATIC FUNCTIONS declarations + */ + +/* Background thread error handling functions. */ +static void checkRxThreadError(void); +static void setRxThreadError(int eno); +static void resetRxThreadError(void); + +static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type); +static void setupOutgoingUDPConnection(int icid, TransportEntry *pChunkEntry, UDPConn *conn); + +/* ICBufferList functions. */ +static inline void icBufferListInitHeadLink(ICBufferLink *link); + +static inline void InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort); +static inline void CleanupMotionUDPIFC(void); + +static bool dispatcherAYT(void); +static void checkQDConnectionAlive(void); + +static void *rxThreadFunc(void *arg); + +static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now); + +static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len); +static void cleanupStartupCache(void); +static void handleCachedPackets(void); + +static uint64 getCurrentTime(void); +static void initMutex(pthread_mutex_t *mutex); + +static inline void logPkt(const char *prefix, icpkthdr *pkt); + +static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len); +#if defined(__darwin__) +#define s6_addr32 __u6_addr.__u6_addr32 +static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest); +#endif + +static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, + int *txFamily, struct sockaddr_storage *listenerSockaddr); +static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort); +static void SendDummyPacket(void); +static bool handleDataPacket(UDPConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param, bool *wakeup_mainthread); +static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len); +static void initUnackQueueRing(UnackQueueRing *uqr); + +static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, const char *errDetail); + +static char *format_sockaddr_udp(struct sockaddr_storage *sa, char *buf, size_t len); + +static char* flags2txt(uint32 pkt_flags); + +static const char* flags_text[] = + {"recv2send", "ack", "stop", "eos", "nak", "disorder", "duplicate", "capacity"}; + +static char* +flags2txt(uint32 pkt_flags) +{ + static char flags[64]; + + char *p = flags; + *p = '\0'; + int bytes = 0; + for (size_t i = 0; i < sizeof(flags_text)/sizeof(flags_text[0]); ++i) { + if (pkt_flags & (1 << i)) + bytes += snprintf(p + bytes, 64, "%s | ", flags_text[i]); + } + + if (bytes > 0) + bytes -= 3; + *(p + bytes) = '\0'; + + return flags; +} + +/* + * CursorICHistoryTable::prune + * Prune entries in the hash table. + */ +void +CursorICHistoryTable::prune(uint32 icId) { + for (uint8 index = 0; index < size; index++) { + CursorICHistoryEntry *p = table[index], *q = NULL; + while (p) { + /* remove an entry if it is older than the prune-point */ + if (p->icId < icId) { + if (!q) + table[index] = p->next; + else + q->next = p->next; + + /* set up next loop */ + CursorICHistoryEntry *trash = p; + p = trash->next; + ic_free(trash); + + count--; + } else { + q = p; + p = p->next; + } + } + } +} + +#ifdef TRANSFER_PROTOCOL_STATS +typedef enum TransProtoEvent TransProtoEvent; +enum TransProtoEvent +{ + TPE_DATA_PKT_SEND, + TPE_ACK_PKT_QUERY +}; + +typedef struct TransProtoStatEntry TransProtoStatEntry; +struct TransProtoStatEntry +{ + TransProtoStatEntry *next; + + /* Basic information */ + uint32 time; + TransProtoEvent event; + int dstPid; + uint32 seq; + + /* more attributes can be added on demand. */ + + /* + * float cwnd; + * int capacity; + */ +}; + +typedef struct TransProtoStats TransProtoStats; +struct TransProtoStats +{ + pthread_mutex_t lock; + TransProtoStatEntry *head; + TransProtoStatEntry *tail; + uint64 count; + uint64 startTime; + + void init(); + void update(TransProtoEvent event, icpkthdr *pkt); + void dump(); +}; + +static TransProtoStats trans_proto_stats = +{ + PTHREAD_MUTEX_INITIALIZER, NULL, NULL, 0 +}; + +/* + * init + * Initialize the transport protocol states data structures. + */ +void +TransProtoStats::init() +{ + pthread_mutex_lock(&this->lock); + + while (this->head) { + TransProtoStatEntry *cur = this->head; + this->head = this->head->next; + ic_free(cur); + this->count--; + } + + this->head = NULL; + this->tail = NULL; + this->count = 0; + this->startTime = getCurrentTime(); + + pthread_mutex_unlock(&this->lock); +} + +void +TransProtoStats::update(TransProtoEvent event, icpkthdr *pkt) +{ + /* Add to list */ + TransProtoStatEntry *entry = (TransProtoStatEntry *) ic_malloc(sizeof(TransProtoStatEntry)); + if (!entry) + return; + + memset(entry, 0, sizeof(*entry)); + + /* change the list */ + pthread_mutex_lock(&this->lock); + if (this->count == 0) { + /* 1st element */ + this->head = entry; + this->tail = entry; + } else { + this->tail->next = entry; + this->tail = entry; + } + this->count++; + + entry->time = getCurrentTime() - this->startTime; + entry->event = event; + entry->dstPid = pkt->dstPid; + entry->seq = pkt->seq; + + /* + * Other attributes can be added on demand new->cwnd = + * snd_control_info.cwnd; new->capacity = conn->capacity; + */ + + pthread_mutex_unlock(&this->lock); +} + +static void +TransProtoStats::dump() +{ + char tmpbuf[32]; + + snprintf(tmpbuf, 32, "%d." UINT64_FORMAT "txt", global_param.MyProcPid, getCurrentTime()); + FILE *ofile = fopen(tmpbuf, "w+"); + + pthread_mutex_lock(&this->lock); + while (this->head) + { + TransProtoStatEntry *cur = NULL; + + cur = this->head; + this->head = this->head->next; + + fprintf(ofile, "time %d event %d seq %d destpid %d\n", cur->time, cur->event, cur->seq, cur->dstPid); + ic_free(cur); + this->count--; + } + + this->tail = NULL; + + pthread_mutex_unlock(&this->lock); + + fclose(ofile); +} + +#endif /* TRANSFER_PROTOCOL_STATS */ + +/* + * initConnHashTable + * Initialize a connection hash table. + */ +bool +ConnHashTable::init() +{ + this->size = global_param.Gp_role == GP_ROLE_DISPATCH_IC ? + (global_param.segment_number * 2) : global_param.ic_htab_size; + Assert(this->size > 0); + + this->table = (struct ConnHtabBin **) ic_malloc(this->size * sizeof(struct ConnHtabBin *)); + if (this->table == NULL) + return false; + + for (int i = 0; i < this->size; i++) + this->table[i] = NULL; + + return true; +} + +/* + * connAddHash + * Add a connection to the hash table + * + * Note: we want to add a connection to the hashtable if it isn't + * already there ... so we just have to check the pointer values -- no + * need to use CONN_HASH_MATCH() at all! + */ +bool +ConnHashTable::add(UDPConn *conn) +{ + uint32 hashcode = CONN_HASH_VALUE(&conn->conn_info) % this->size; + + /* + * check for collision -- if we already have an entry for this connection, + * don't add another one. + */ + for (struct ConnHtabBin *bin = this->table[hashcode]; bin != NULL; bin = bin->next) + { + if (bin->conn == conn) + { + LOG(DEBUG5, "ConnHashTable::add(): duplicate ?! node %d route %d", conn->conn_info.motNodeId, conn->route); + return true; /* false *only* indicates memory-alloc + * failure. */ + } + } + + struct ConnHtabBin *newbin = (struct ConnHtabBin *) ic_malloc(sizeof(struct ConnHtabBin)); + if (newbin == NULL) + return false; + + newbin->conn = conn; + newbin->next = this->table[hashcode]; + this->table[hashcode] = newbin; + + ic_statistics.activeConnectionsNum++; + + return true; +} + +/* + * remove + * Delete a connection from the hash table + * + * Note: we want to remove a connection from the hashtable if it is + * there ... so we just have to check the pointer values -- no need to + * use CONN_HASH_MATCH() at all! + */ +void +ConnHashTable::remove(UDPConn *conn) +{ + uint32 hashcode; + struct ConnHtabBin *c, + *p, + *trash; + + hashcode = CONN_HASH_VALUE(&conn->conn_info) % this->size; + + c = this->table[hashcode]; + + /* find entry */ + p = NULL; + while (c != NULL) + { + /* found ? */ + if (c->conn == conn) + break; + + p = c; + c = c->next; + } + + /* not found ? */ + if (c == NULL) + { + return; + } + + /* found the connection, remove from the chain. */ + trash = c; + + if (p == NULL) + this->table[hashcode] = c->next; + else + p->next = c->next; + + ic_free(trash); + + ic_statistics.activeConnectionsNum--; + + return; +} + +/* + * findConnByHeader + * Find the corresponding connection given a pkt header information. + * + * With the new mirroring scheme, the interconnect is no longer involved: + * we don't have to disambiguate anymore. + * + * NOTE: the icpkthdr field dstListenerPort is used for disambiguation. + * on receivers it may not match the actual port (it may have an extra bit + * set (1<<31)). + */ +UDPConn * +ConnHashTable::find(icpkthdr *hdr) { + + uint32 hashcode = CONN_HASH_VALUE(hdr) % this->size; + for (struct ConnHtabBin *bin = this->table[hashcode]; bin != NULL; bin = bin->next) { + UDPConn *conn = bin->conn; + + if (CONN_HASH_MATCH(&conn->conn_info, hdr)) { + UDPConn *ret = conn; + if (IC_DEBUG5 >= session_param.log_min_messages) + LOG(DEBUG5, "ConnHashTable::find: found. route %d state %d hashcode %d conn %p", + conn->route, ret->state, hashcode, ret); + + return ret; + } + } + + if (IC_DEBUG5 >= session_param.log_min_messages) + LOG(DEBUG5, "ConnHashTable::find: not found! (hdr->srcPid %d hdr->srcContentId %d " + "hdr->dstContentId %d hdr->dstPid %d sess(%d:%d) cmd(%d:%d)) hashcode %d", + hdr->srcPid, hdr->srcContentId, hdr->dstContentId, hdr->dstPid, hdr->sessionId, + session_param.gp_session_id, hdr->icId, ic_control_info.ic_instance_id, hashcode); + + return NULL; +} + +void +ConnHashTable::destroy() { + for (int i = 0; i < this->size; i++) { + while (this->table[i] != NULL) { + struct ConnHtabBin *trash = this->table[i]; + this->table[i] = trash->next; + ic_free(trash); + } + } + + ic_free(this->table); + this->table = NULL; + this->size = 0; +} + +/* + * icBufferListInitHeadLink + * Initialize the pointers in the head link to point to itself. + */ +static inline void +icBufferListInitHeadLink(ICBufferLink *link) +{ + link->next = link->prev = link; +} + + +#if defined(USE_ASSERT_CHECKING) || defined(AMS_VERBOSE_LOGGING) + +/* + * icBufferListLog + * Log the buffer list. + */ +void +ICBufferList::icBufferListLog() +{ + LOG(INFO, "Length %d, type %d headptr %p", this->len, this->type, &this->head); + + ICBufferLink *bufLink = this->head.next; + + int len = this->len; + int i = 0; + + while (bufLink != &this->head && len > 0) + { + ICBuffer *buf = (this->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink) + : GET_ICBUFFER_FROM_SECONDARY(bufLink)); + + LOG(INFO, "Node %d, linkptr %p", i++, bufLink); + + logPkt("from list", buf->pkt); + bufLink = bufLink->next; + len--; + } +} +#endif + +#ifdef USE_ASSERT_CHECKING +/* + * icBufferListCheck + * Buffer list sanity check. + */ +void +ICBufferList::icBufferListCheck(const char *prefix) +{ + int len = this->len; + ICBufferLink *link = this->head.next; + + if (len < 0) + { + LOG(LOG_ERROR, "ICBufferList ERROR %s: list length %d < 0 ", prefix, this->length()); + goto error; + } + + if (len == 0 && (this->head.prev != this->head.next && this->head.prev != &this->head)) + { + LOG(LOG_ERROR, "ICBufferList ERROR %s: length is 0, &list->head %p, prev %p, next %p", + prefix, &this->head, this->head.prev, this->head.next); + this->icBufferListLog(); + goto error; + } + + while (len > 0) + { + link = link->next; + len--; + } + + if (link != &this->head) + { + LOG(LOG_ERROR, "ICBufferList ERROR: %s len %d", prefix, this->len); + this->icBufferListLog(); + goto error; + } + + return; + +error: + LOG(INFO, "wait for 120s and then abort."); + ic_usleep(120000000); + abort(); +} +#endif + +/* + * ICBufferList::init + * Initialize the buffer list with the given type. + */ +void +ICBufferList::init(ICBufferListType atype) +{ + Assert(atype == ICBufferListType_Primary|| atype == ICBufferListType_Secondary); + + type = atype; + len = 0; + + icBufferListInitHeadLink(&head); + +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::init"); +#endif +} + +/* + * ICBufferList::is_head + * Return whether the given link is the head link of the list. + * + * This function is often used as the end condition of an iteration of the list. + */ +bool +ICBufferList::is_head(ICBufferLink *link) +{ +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::is_head"); +#endif + return (link == &head); +} + +/* + * ICBufferList::first + * Return the first link after the head link. + * + * Note that the head link is a pseudo link used to only to ease the operations of the link list. + * If the list only contains the head link, this function will return the head link. + */ +ICBufferLink * +ICBufferList::first() +{ +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::first"); +#endif + return head.next; +} + +/* + * ICBufferList::length + * Get the list length. + */ +int +ICBufferList::length() +{ + return len; +} + +/* + * ICBufferList::delete + * Remove an buffer from the buffer list and return the buffer. + */ +ICBuffer * +ICBufferList::remove(ICBuffer *buf) +{ +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::delete"); +#endif + + ICBufferLink *bufLink = NULL; + + bufLink = (this->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary); + + bufLink->prev->next = bufLink->next; + bufLink->next->prev = bufLink->prev; + + len--; + + return buf; +} + +/* + * ICBufferList::pop + * Remove the head buffer from the list. + */ +ICBuffer * +ICBufferList::pop() +{ + ICBuffer *buf = NULL; + ICBufferLink *bufLink = NULL; + +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::pop"); +#endif + + if (this->len == 0) + return NULL; + + bufLink = this->first(); + buf = (this->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink) + : GET_ICBUFFER_FROM_SECONDARY(bufLink)); + + bufLink->prev->next = bufLink->next; + bufLink->next->prev = bufLink->prev; + + this->len--; + + return buf; +} + +/* + * ICBufferList::free + * Free all the buffers in the list. + */ +void +ICBufferList::destroy() +{ + ICBuffer *buf = NULL; + +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::free"); +#endif + + while ((buf = this->pop()) != NULL) + ic_free(buf); +} + +/* + * ICBufferList::append + * Append a buffer to the tail of a double-link list. + */ +ICBuffer * +ICBufferList::append(ICBuffer *buf) +{ + Assert(buf); + +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::append"); +#endif + + ICBufferLink *bufLink = NULL; + + bufLink = (this->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary); + + bufLink->prev = this->head.prev; + bufLink->next = &this->head; + + this->head.prev->next = bufLink; + this->head.prev = bufLink; + + this->len++; + + return buf; +} + +/* + * ICBufferList::return + * Return the buffers in the list to the free buffer list. + * + * If the buf is also in an expiration queue, we also need to remove it from the expiration queue. + * + */ +void +ICBufferList::release(bool inExpirationQueue) +{ +#ifdef USE_ASSERT_CHECKING + this->icBufferListCheck("ICBufferList::return"); +#endif + ICBuffer *buf = NULL; + + while ((buf = this->pop()) != NULL) + { + if (inExpirationQueue) /* the buf is in also in the expiration queue */ + { + ICBufferList *alist = &unack_queue_ring.slots[buf->unackQueueRingSlot]; + buf = alist->remove(buf); + unack_queue_ring.numOutStanding--; + if (this->length() >= 1) + unack_queue_ring.numSharedOutStanding--; + } + + snd_buffer_pool.freeList.append(buf); + } +} + +#ifdef USE_ASSERT_CHECKING +/* + * ICBufferList::dump_to_file + * Dump a buffer list. + */ +void +ICBufferList::dump_to_file(FILE *ofile) +{ + this->icBufferListCheck("ICBufferList::dump_to_file"); + + ICBufferLink *bufLink = this->head.next; + + int len = this->len; + int i = 0; + + fprintf(ofile, "List Length %d\n", len); + while (bufLink != &this->head && len > 0) + { + ICBuffer *buf = (this->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink) + : GET_ICBUFFER_FROM_SECONDARY(bufLink)); + + fprintf(ofile, "Node %d, linkptr %p ", i++, bufLink); + fprintf(ofile, "Packet Content [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d " + "srcContentId %d dstDesContentId %d " + "srcPid %d dstPid %d " + "srcListenerPort %d dstListernerPort %d " + "sendSliceIndex %d recvSliceIndex %d " + "sessionId %d icId %d " + "flags %d\n", + buf->pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA", + buf->pkt->seq, buf->pkt->extraSeq, buf->pkt->motNodeId, buf->pkt->crc, buf->pkt->len, + buf->pkt->srcContentId, buf->pkt->dstContentId, + buf->pkt->srcPid, buf->pkt->dstPid, + buf->pkt->srcListenerPort, buf->pkt->dstListenerPort, + buf->pkt->sendSliceIndex, buf->pkt->recvSliceIndex, + buf->pkt->sessionId, buf->pkt->icId, + buf->pkt->flags); + bufLink = bufLink->next; + len--; + } +} +#endif + +/* + * initUnackQueueRing + * Initialize an unack queue ring. + * + * Align current time to a slot boundary and set current slot index (time pointer) to 0. + */ +static void +initUnackQueueRing(UnackQueueRing *uqr) +{ + int i = 0; + + uqr->currentTime = 0; + uqr->idx = 0; + uqr->numOutStanding = 0; + uqr->numSharedOutStanding = 0; + + for (; i < UNACK_QUEUE_RING_SLOTS_NUM; i++) + { + uqr->slots[i].init(ICBufferListType_Secondary); + } +} + +/* + * RxBufferPool::get + * Get a receive buffer. + * + * SHOULD BE CALLED WITH ic_control_info.lock *LOCKED* + * + * NOTE: This function MUST NOT contain elog or ereport statements. + * elog is NOT thread-safe. Developers should instead use something like: + * + * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe. + */ +icpkthdr * +RxBufferPool::get() +{ + icpkthdr *ret = NULL; + +#ifdef USE_ASSERT_CHECKING + if (FINC_HAS_FAULT(FINC_RX_BUF_NULL) && + testmode_inject_fault(session_param.gp_udpic_fault_inject_percent)) + return NULL; +#endif + + do + { + if (this->freeList == NULL) + { + if (this->count > this->maxCount) + { + if (IC_DEBUG3 >= session_param.log_min_messages) + LOG(DEBUG3, "Interconnect ran out of rx-buffers count/max %d/%d", this->count, this->maxCount); + break; + } + + /* malloc is used for thread safty. */ + ret = (icpkthdr *) ic_malloc(global_param.Gp_max_packet_size); + + /* + * Note: we return NULL if the malloc() fails -- and the + * background thread will set the error. Main thread will check + * the error, report it and start teardown. + */ + if (ret != NULL) + this->count++; + + break; + } + + /* we have buffers available in our freelist */ + ret = this->get_free(); + + } while (0); + + return ret; +} + +/* + * RxBufferPool::put + * Return a receive buffer to free list + * + * SHOULD BE CALLED WITH ic_control_info.lock *LOCKED* + */ +void +RxBufferPool::put(icpkthdr *buf) +{ + /* return the buffer into the free list. */ + *(char **) buf = this->freeList; + this->freeList = (char *)buf; +} + +/* + * RxBufferPool::get_free + * Get a receive buffer from free list + * + * SHOULD BE CALLED WITH ic_control_info.lock *LOCKED* + * + * NOTE: This function MUST NOT contain elog or ereport statements. + * elog is NOT thread-safe. Developers should instead use something like: + * + * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe. + */ +icpkthdr * +RxBufferPool::get_free() +{ + icpkthdr *buf = NULL; + + buf = (icpkthdr *) this->freeList; + this->freeList = *(char **) (this->freeList); + return buf; +} + +/* + * RxBufferPool::free + * Free a receive buffer. + * + * NOTE: This function MUST NOT contain elog or ereport statements. + * elog is NOT thread-safe. Developers should instead use something like: + * + * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe. + */ +void +RxBufferPool::release(icpkthdr *buf) +{ + ic_free(buf); + count--; +} + +/* + * init + * Initialize the send buffer pool. + * + * The initial maxCount is set to 1 for gp_interconnect_snd_queue_depth = 1 case, + * then there is at least an extra free buffer to send for that case. + */ +void +SendBufferPool::init() +{ + this->freeList.init(ICBufferListType_Primary); + this->count = 0; + this->maxCount = (session_param.Gp_interconnect_snd_queue_depth == 1 ? 1 : 0); +} + +/* + * clean + * Clean the send buffer pool. + */ +void +SendBufferPool::clean() +{ + this->freeList.destroy(); + this->count = 0; + this->maxCount = 0; +} + +/* + * get + * Get a send buffer for a connection. + * + * Different flow control mechanisms use different buffer management policies. + * Capacity based flow control uses per-connection buffer policy and Loss based + * flow control uses shared buffer policy. + * + * Return NULL when no free buffer available. + */ +ICBuffer * +SendBufferPool::get(UDPConn *conn) +{ + ICBuffer *ret = NULL; + + ic_statistics.totalBuffers += (this->freeList.length() + this->maxCount - this->count); + ic_statistics.bufferCountingTime++; + + /* Capacity based flow control does not use shared buffers */ + if (session_param.Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY_IC) + { + Assert(conn->unackQueue.length() + conn->sndQueue.length() <= session_param.Gp_interconnect_snd_queue_depth); + if (conn->unackQueue.length() + conn->sndQueue.length() >= session_param.Gp_interconnect_snd_queue_depth) + return NULL; + } + + if (this->freeList.length() > 0) + { + return this->freeList.pop(); + } + else + { + if (this->count < this->maxCount) + { + ret = (ICBuffer *) ic_malloc0(global_param.Gp_max_packet_size + sizeof(ICBuffer)); + this->count++; + ret->conn = NULL; + ret->nRetry = 0; + icBufferListInitHeadLink(&ret->primary); + icBufferListInitHeadLink(&ret->secondary); + ret->unackQueueRingSlot = 0; + } + else + { + return NULL; + } + } + + return ret; +} + +/* + * addCRC + * add CRC field to the packet. + */ +static void +addCRC(icpkthdr *pkt) +{ + pkt->crc = ComputeCRC(pkt, pkt->len); +} + +/* + * checkCRC + * check the validity of the packet. + */ +static bool +checkCRC(icpkthdr *pkt) +{ + uint32 rx_crc, + local_crc; + + rx_crc = pkt->crc; + pkt->crc = 0; + local_crc = ComputeCRC(pkt, pkt->len); + if (rx_crc != local_crc) + { + return false; + } + + return true; +} + + +/* + * checkRxThreadError + * Check whether there was error in the background thread in main thread. + * + * If error found, report it. + */ +static void +checkRxThreadError() +{ + int eno; + + eno = ic_atomic_read_u32(&ic_control_info.eno); + if (eno != 0) + { + errno = eno; + + std::stringstream ss; + ss <<"ERROR, interconnect encountered an error, in receive background thread: "<<strerror(eno); + throw ICReceiveThreadException(ss.str(), __FILE__, __LINE__); + } +} + +/* + * setRxThreadError + * Set the error no in background thread. + * + * Record the error in background thread. Main thread checks the errors periodically. + * If main thread will find it, main thread will handle it. + */ +static void +setRxThreadError(int eno) +{ + uint32 expected = 0; + + /* always let main thread know the error that occurred first. */ + if (ic_atomic_compare_exchange_u32(&ic_control_info.eno, &expected, (uint32) eno)) + { + LOG(LOG_ERROR, "Interconnect error: in background thread, set ic_control_info.eno to %d, " + "rx_buffer_pool.count %d, rx_buffer_pool.maxCount %d", + expected, rx_buffer_pool.count, rx_buffer_pool.maxCount); + } +} + +/* + * resetRxThreadError + * Reset the error no. + * + */ +static void +resetRxThreadError() +{ + ic_atomic_write_u32(&ic_control_info.eno, 0); +} + +/* + * Set UDP IC send/receive socket buffer size. + * + * We must carefully size the UDP IC socket's send/receive buffers. If the size + * is too small, say 128K, and send queue depth and receive queue depth are + * large, then there might be a lot of dropped/reordered packets. We start + * trying from a size of 2MB (unless Gp_udp_bufsize_k is specified), and + * gradually back off to UDPIC_MIN_BUF_SIZE. For a given size setting to be + * successful, the corresponding UDP kernel buffer size params must be adequate. + * + */ +static uint32 +setUDPSocketBufferSize(int ic_socket, int buffer_type) +{ + int expected_size; + int curr_size; + socklen_t option_len = 0; + + Assert(buffer_type == SO_SNDBUF || buffer_type == SO_RCVBUF); + + expected_size = (global_param.Gp_udp_bufsize_k ? global_param.Gp_udp_bufsize_k * 1024 : 2048 * 1024); + + curr_size = expected_size; + option_len = sizeof(curr_size); + while (setsockopt(ic_socket, SOL_SOCKET, buffer_type, (const char *) &curr_size, option_len) < 0) + { + if(session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "UDP-IC: setsockopt %s failed to set buffer size = %d bytes: %m", + buffer_type == SO_SNDBUF ? "send": "receive", + curr_size); + + curr_size = curr_size >> 1; + if (curr_size < UDPIC_MIN_BUF_SIZE) + return -1; + } + + if(session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "UDP-IC: socket %s current buffer size = %d bytes", + buffer_type == SO_SNDBUF ? "send": "receive", + curr_size); + + return curr_size; +} + + +/* + * setupUDPListeningSocket + * Setup udp listening socket. + */ +static void +setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct sockaddr_storage *listenerSockaddr) +{ + struct addrinfo *addrs = NULL; + struct addrinfo *addr; + struct addrinfo hints; + int ret; + int ic_socket = INVALID_SOCKET; + struct sockaddr_storage ic_socket_addr; + int tries = 0; + struct sockaddr_storage listenerAddr; + socklen_t listenerAddrlen = sizeof(ic_socket_addr); + uint32 socketSendBufferSize; + uint32 socketRecvBufferSize; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ + hints.ai_protocol = 0; + hints.ai_addrlen = 0; + hints.ai_addr = NULL; + hints.ai_canonname = NULL; + hints.ai_next = NULL; + hints.ai_flags |= AI_NUMERICHOST; + +#ifdef USE_ASSERT_CHECKING + if (session_param.gp_udpic_network_disable_ipv6) + hints.ai_family = AF_INET; +#endif + + if (global_param.Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST_IC) + { + Assert(global_param.interconnect_address && strlen(global_param.interconnect_address) > 0); + hints.ai_flags |= AI_NUMERICHOST; + if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "getaddrinfo called with unicast address: %s", global_param.interconnect_address); + } + else + { + Assert(global_param.interconnect_address == NULL); + hints.ai_flags |= AI_PASSIVE; + if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "getaddrinfo called with wildcard address"); + } + + /* + * Restrict what IP address we will listen on to just the one that was + * used to create this QE session. + */ + Assert(global_param.interconnect_address && strlen(global_param.interconnect_address) > 0); + ret = getaddrinfo((!global_param.interconnect_address || global_param.interconnect_address[0] == '\0') ? NULL : global_param.interconnect_address, + NULL, &hints, &addrs); + if (ret || !addrs) + { + LOG(INFO, "could not resolve address for UDP IC socket %s: %s", + global_param.interconnect_address, + gai_strerror(ret)); + goto startup_failed; + } + + /* + * On some platforms, pg_getaddrinfo_all() may return multiple addresses + * only one of which will actually work (eg, both IPv6 and IPv4 addresses + * when kernel will reject IPv6). Worse, the failure may occur at the + * bind() or perhaps even connect() stage. So we must loop through the + * results till we find a working combination. We will generate DEBUG + * messages, but no error, for bogus combinations. + */ + for (addr = addrs; addr != NULL; addr = addr->ai_next) + { +#ifdef HAVE_UNIX_SOCKETS + /* Ignore AF_UNIX sockets, if any are returned. */ + if (addr->ai_family == AF_UNIX) + continue; +#endif + + if (++tries > 1 && session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "trying another address for UDP interconnect socket"); + + ic_socket = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (ic_socket == INVALID_SOCKET) + { + if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "could not create UDP interconnect socket: %m"); + continue; + } + + /* + * Bind the socket to a kernel assigned ephemeral port on the + * interconnect_address. + */ + if (bind(ic_socket, addr->ai_addr, addr->ai_addrlen) < 0) + { + if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "could not bind UDP interconnect socket: %m"); + closesocket(ic_socket); + ic_socket = INVALID_SOCKET; + continue; + } + + /* Call getsockname() to eventually obtain the assigned ephemeral port */ + if (getsockname(ic_socket, (struct sockaddr *) &listenerAddr, &listenerAddrlen) < 0) + { + if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC) + LOG(DEBUG3, "could not get address of UDP interconnect socket: %m"); + closesocket(ic_socket); + ic_socket = INVALID_SOCKET; + continue; + } + + /* If we get here, we have a working socket */ + break; + } + + if (!addr || ic_socket == INVALID_SOCKET) + goto startup_failed; + + /* Memorize the socket fd, kernel assigned port and address family */ + *listenerSocketFd = ic_socket; + if (listenerAddr.ss_family == AF_INET6) + { + *listenerPort = ntohs(((struct sockaddr_in6 *) &listenerAddr)->sin6_port); + *txFamily = AF_INET6; + } + else + { + *listenerPort = ntohs(((struct sockaddr_in *) &listenerAddr)->sin_port); + *txFamily = AF_INET; + } + + /* + * cache the successful sockaddr of the listening socket, so + * we can use this information to connect to the listening socket. + */ + if (listenerSockaddr != NULL) + memcpy(listenerSockaddr, &listenerAddr, sizeof(struct sockaddr_storage)); + + /* Set up socket non-blocking mode */ + if (!ic_set_noblock(ic_socket)) + { + LOG(INFO, "could not set UDP interconnect socket to nonblocking mode: %s", strerror(errno)); + goto startup_failed; + } + + /* Set up the socket's send and receive buffer sizes. */ + socketRecvBufferSize = setUDPSocketBufferSize(ic_socket, SO_RCVBUF); + if (socketRecvBufferSize == static_cast<uint32>(-1)) + goto startup_failed; + ic_control_info.socketRecvBufferSize = socketRecvBufferSize; + + socketSendBufferSize = setUDPSocketBufferSize(ic_socket, SO_SNDBUF); + if (socketSendBufferSize == static_cast<uint32>(-1)) + goto startup_failed; + ic_control_info.socketSendBufferSize = socketSendBufferSize; + + if (addrs != NULL) + freeaddrinfo(addrs); + return; + +startup_failed: + if (addrs) + freeaddrinfo(addrs); + if (ic_socket != INVALID_SOCKET) + { + closesocket(ic_socket); + } + + std::stringstream ss; + ss << "ERROR,interconnect error: Could not set up udp listener socket: " << strerror(errno); + throw ICNetworkException(ss.str(), __FILE__, __LINE__); +} + +/* + * InitMutex + * Initialize mutex. + */ +static void +initMutex(pthread_mutex_t *mutex) +{ + pthread_mutexattr_t m_atts; Review Comment: why not use std::mutex and std::lock_guard ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
