yjhjstz commented on code in PR #1357:
URL: https://github.com/apache/cloudberry/pull/1357#discussion_r2429822317


##########
contrib/udp2/ic_common/udp2/ic_udp2.cpp:
##########
@@ -0,0 +1,6958 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * ic_udp2.cpp
+ *
+ * IDENTIFICATION
+ *    contrib/udp2/ic_common/udp2/ic_udp2.cpp
+ *
+ *               +--------------+
+ *               |  ic_types.h  |
+ *               +--------------+
+ *                  /         \
+ *       +--------------+  +---------------+
+ *       | C interface  |  | C++ interface |
+ *       |  ic_udp2.h   |  |  ic_udp2.hpp  |
+ *       +--------------+  +---------------+
+ *                   \         /
+ *            +----------------------+
+ *            |     C++ implement    |
+ *            |  ic_udp2_internal.hpp|
+ *            |  ic_faultinjection.h |
+ *            |  ic_udp2.cpp         |
+ *            +----------------------+
+ *-------------------------------------------------------------------------
+ */
+#include <chrono>
+#include <condition_variable>
+#include <mutex>
+#include <sstream>
+#include <stdexcept>
+#include <thread>
+#include <vector>
+#include <atomic>
+#include <cstdarg>
+
+/*
+ * interface header files
+ */
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ic_types.h"
+#include "ic_udp2.h"
+
+#ifdef __cplusplus
+}
+#endif
+
+#include "ic_udp2.hpp"
+
+/*
+ * internal header files
+ */
+#include "ic_utility.hpp"
+#include "ic_udp2_internal.hpp"
+#include "ic_faultinjection.h"
+
+static int timeoutArray[] =
+{
+       1,
+       1,
+       2,
+       4,
+       8,
+       16,
+       32,
+       64,
+       128,
+       256,
+       512,
+       512                                                     /* MAX_TRY */
+};
+
+/*
+ * Main thread (Receiver) and background thread use the information in
+ * this data structure to handle data packets.
+ */
+static ReceiveControlInfo rx_control_info;
+
+/*
+ * The buffer pool used for keeping data packets.
+ *
+ * maxCount is set to 1 to make sure there is always a buffer
+ * for picking packets from OS buffer.
+ */
+static RxBufferPool rx_buffer_pool = {1, 0, NULL};
+
+/*
+ * The sender side buffer pool.
+ */
+static SendBufferPool snd_buffer_pool;
+
+/*
+ * Main thread use the information in this data structure to do ack handling
+ * and congestion control.
+ */
+static SendControlInfo snd_control_info;
+
+/*
+ * Shared control information that is used by senders, receivers and 
background thread.
+ */
+static ICGlobalControlInfo ic_control_info;
+
+/*
+ * All connections in a process share this unack queue ring instance.
+ */
+static UnackQueueRing unack_queue_ring = {0, 0, 0};
+
+static int     ICSenderSocket = -1;
+static int32 ICSenderPort = 0;
+static int     ICSenderFamily = 0;
+
+/* Statistics for UDP interconnect. */
+static ICStatistics ic_statistics;
+
+/* Cached sockaddr of the listening udp socket */
+static struct sockaddr_storage udp_dummy_packet_sockaddr;
+
+/* UDP listen fd */
+static int                     UDP_listenerFd = -1;
+
+/* UDP listen port */
+static int32 udp_listener_port = 0;
+
+static std::mutex mtx;
+static std::condition_variable cv;
+
+CChunkTransportState *CChunkTransportStateImpl::state_ = nullptr;
+
+/*
+ * Identity the user of ic module by vector_engine_is_user:
+ * "false" means PG executor, "true" means Arrow executor.
+ */
+static thread_local bool vector_engine_is_user = false;
+static thread_local bool thread_quit = false;
+
+#define CHECK_QUIT_FLAG() \
+       do { \
+               if (thread_quit) { \
+                       throw ICException("received thread quit flag.", 
__FILE__, __LINE__); \
+               } \
+       } while(0)
+
+#define CHECK_INTERRUPTS(state) \
+       do { \
+               if (vector_engine_is_user) { \
+                       CHECK_QUIT_FLAG(); \
+               } else if (global_param.checkInterruptsCallback) { \
+                       
global_param.checkInterruptsCallback((state)->teardownActive); \
+               } \
+       } while(0)
+
+#define CHECK_CANCEL(state) \
+       do { \
+               if (vector_engine_is_user) { \
+                       CHECK_QUIT_FLAG(); \
+               } else if (global_param.checkCancelOnQDCallback) { \
+                       global_param.checkCancelOnQDCallback(state); \
+               } \
+       } while(0)
+
+#define CHECK_POSTMASTER_ALIVE() \
+       do { \
+               if (vector_engine_is_user) { \
+                       CHECK_QUIT_FLAG(); \
+               } else if (global_param.checkPostmasterIsAliveCallback && 
!global_param.checkPostmasterIsAliveCallback()) { \
+                       throw ICFatalException("FATAL, interconnect failed to 
send chunks, Postmaster is not alive.", __FILE__, __LINE__); \
+               } \
+       } while(0)
+
+/*=========================================================================
+ * STATIC FUNCTIONS declarations
+ */
+
+/* Background thread error handling functions. */
+static void checkRxThreadError(void);
+static void setRxThreadError(int eno);
+static void resetRxThreadError(void);
+
+static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type);
+static void setupOutgoingUDPConnection(int icid, TransportEntry *pChunkEntry, 
UDPConn *conn);
+
+/* ICBufferList functions. */
+static inline void icBufferListInitHeadLink(ICBufferLink *link);
+
+static inline void InitMotionUDPIFC(int *listenerSocketFd, int32 
*listenerPort);
+static inline void CleanupMotionUDPIFC(void);
+
+static bool dispatcherAYT(void);
+static void checkQDConnectionAlive(void);
+
+static void *rxThreadFunc(void *arg);
+
+static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 
expTime, uint64 now);
+
+static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, 
int peer_len);
+static void cleanupStartupCache(void);
+static void handleCachedPackets(void);
+
+static uint64 getCurrentTime(void);
+static void initMutex(pthread_mutex_t *mutex);
+
+static inline void logPkt(const char *prefix, icpkthdr *pkt);
+
+static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, 
socklen_t *o_len);
+#if defined(__darwin__)
+#define    s6_addr32 __u6_addr.__u6_addr32
+static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest);
+#endif
+
+static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort,
+                           int *txFamily, struct sockaddr_storage 
*listenerSockaddr);
+static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, 
const char *listenerAddr, int listenerPort);
+static void SendDummyPacket(void);
+static bool handleDataPacket(UDPConn *conn, icpkthdr *pkt, struct 
sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param, bool 
*wakeup_mainthread);
+static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int 
peer_len);
+static void initUnackQueueRing(UnackQueueRing *uqr);
+
+static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, 
int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, 
const char *errDetail);
+
+static char *format_sockaddr_udp(struct sockaddr_storage *sa, char *buf, 
size_t len);
+
+static char* flags2txt(uint32 pkt_flags);
+
+static const char* flags_text[] =
+       {"recv2send", "ack", "stop", "eos", "nak", "disorder", "duplicate", 
"capacity"};
+
+static char*
+flags2txt(uint32 pkt_flags)
+{
+       static char flags[64];

Review Comment:
   flags  not thread safe 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to