This is an automated email from the ASF dual-hosted git repository.

yjhjstz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit efa99fdc3ffcf1d05439cfac61d6308be820c977
Author: Jianghua Yang <[email protected]>
AuthorDate: Fri Jun 20 23:24:56 2025 +0000

    Use event_base with libevent 2.0+ to avoid thread-unsafe event_init in 
gpfdist
    
    The legacy event_init() function is not thread-safe and can cause issues
    when gpfdist is run in multi-threaded environments. This patch updates
    gpfdist to use libevent 2.0+'s thread-safe APIs, specifically
    `event_base` along with `event_assign()` and `evtimer_assign()`.
    
    A new global `gcb.event_base` is introduced and used when compiled with
    libevent ≥ 2.0.1. This avoids the need for the deprecated and
    non-thread-safe `event_set()` / `evtimer_set()` APIs, and prepares
    gpfdist for better thread safety.
---
 src/bin/gpfdist/gpfdist.c | 59 +++++++++++++++++++++++++++++++++--------------
 1 file changed, 42 insertions(+), 17 deletions(-)

diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c
index ab3a5b159c4..ad7fb868313 100644
--- a/src/bin/gpfdist/gpfdist.c
+++ b/src/bin/gpfdist/gpfdist.c
@@ -233,6 +233,7 @@ static struct
        SSL_CTX                 *server_ctx;/* for SSL */
 #endif
        int                     wdtimer; /* Kill gpfdist after k seconds of 
inactivity. 0 to disable. */
+       struct event_base *event_base; /* for libevent 2.0+ */
 } gcb;
 
 /*  A session */
@@ -1600,7 +1601,7 @@ static void session_detach(request_t* r)
                        }
 
                        event_del(&session->ev);
-                       evtimer_set(&session->ev, free_session_cb, session);
+                       evtimer_assign(&session->ev, gcb.event_base, 
free_session_cb, session);
                        session->tm.tv_sec  = opt.w;
                        session->tm.tv_usec = 0;
                        (void)evtimer_add(&session->ev, &session->tm);
@@ -1811,7 +1812,7 @@ static int session_attach(request_t* r)
                session->active_segids[r->segid] = 1; /* mark this segid as 
active */
                session->maxsegs = r->totalsegs;
                session->requests = apr_hash_make(pool);
-               event_set(&session->ev, 0, 0, 0, 0);
+               event_assign(&session->ev, gcb.event_base, -1, 0, NULL, NULL);
 
                if (session->tid == 0 || session->path == 0 || session->key == 
0)
                        gfatal(r, "out of memory in session_attach");
@@ -2368,7 +2369,7 @@ static void do_accept(int fd, short event, void* arg)
        r->pool = pool;
        r->sock = sock;
 
-       event_set(&r->ev, 0, 0, 0, 0);
+       event_assign(&r->ev, gcb.event_base, -1, 0, NULL, NULL);
 
        /* use the block size specified by -m option */
        r->outblock.data = palloc_safe(r, pool, opt.m, "out of memory when 
allocating buffer: %d bytes", opt.m);
@@ -2421,7 +2422,7 @@ static int setup_write(request_t* r)
        if (r->sock < 0)
                gwarning(r, "internal error in setup_write - no socket to use");
        event_del(&r->ev);
-       event_set(&r->ev, r->sock, EV_WRITE, do_write, r);
+       event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, do_write, r);
        return (event_add(&r->ev, 0));
 }
 
@@ -2445,7 +2446,7 @@ static int setup_read(request_t* r)
                gwarning(r, "internal error in setup_read - no socket to use");
 
        event_del(&r->ev);
-       event_set(&r->ev, r->sock, EV_READ, do_read_request, r);
+       event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_read_request, 
r);
 
        if(opt.t == 0)
        {
@@ -2552,18 +2553,32 @@ static void
 signal_register()
 {
     /* when SIGTERM raised invoke process_term_signal */
-    signal_set(&gcb.signal_event,SIGTERM,process_term_signal,0);
+    evsignal_assign(&gcb.signal_event, gcb.event_base, SIGTERM, 
process_term_signal, 0);
 
     /* high priority so we accept as fast as possible */
     if(event_priority_set(&gcb.signal_event, 0))
         gwarning(NULL,"signal event priority set failed");
 
     /* start watching this event */
-       if(signal_add(&gcb.signal_event, 0))
+       if(evsignal_add(&gcb.signal_event, 0))
         gfatal(NULL,"cannot set up event on signal register");
 
 }
 
+/*
+ * gpfdist_cleanup
+ *
+ * Clean up all resources before exiting
+ */
+static void gpfdist_cleanup(void)
+{
+       /* Clean up event_base if initialized */
+       if (gcb.event_base) {
+               event_base_free(gcb.event_base);
+               gcb.event_base = NULL;
+       }
+}
+
 static void clear_listen_sock(void)
 {
        SOCKET sock = -1;
@@ -2616,9 +2631,8 @@ http_setup(void)
                hostaddr = opt.b;
 
        /* setup event priority */
-       if (event_priority_init(10))
-               gwarning(NULL, "event_priority_init failed");
-
+       if (event_base_priority_init(gcb.event_base, 10))
+               gwarning(NULL, "event_base_priority_init failed");
 
        /* Try each possible port from opt.p to opt.last_port */
        for (;;)
@@ -2811,8 +2825,8 @@ http_setup(void)
        for (i = 0; i < gcb.listen_sock_count; i++)
        {
                /* when this socket is ready, do accept */
-               event_set(&gcb.listen_events[i], gcb.listen_socks[i], EV_READ | 
EV_PERSIST,
-                                 do_accept, 0);
+               event_assign(&gcb.listen_events[i], gcb.event_base, 
gcb.listen_socks[i], 
+                           EV_READ | EV_PERSIST, do_accept, 0);
 
         /* only signal process function priority higher than socket handler */
                if (event_priority_set(&gcb.listen_events[i], 1))
@@ -2838,6 +2852,9 @@ process_term_signal(int sig,short event,void* arg)
                        {
                                closesocket(gcb.listen_socks[i]);
                        }
+               
+               /* Clean up resources before exiting */
+               gpfdist_cleanup();
                _exit(1);
 }
 
@@ -3913,7 +3930,10 @@ int gpfdist_init(int argc, const char* const argv[])
                putenv("EVENT_SHOW_METHOD=1");
        putenv("EVENT_NOKQUEUE=1");
 
-       event_init();
+       /* libevent 2.0+ */
+       gcb.event_base = event_base_new();
+       if (!gcb.event_base)
+               gfatal(NULL, "event_base_new failed");
 
     signal_register();
        http_setup();
@@ -3991,16 +4011,19 @@ int gpfdist_init(int argc, const char* const argv[])
 
 int gpfdist_run()
 {
-       return event_dispatch();
+       return event_base_dispatch(gcb.event_base);
 }
 
 #ifndef WIN32
 
 int main(int argc, const char* const argv[])
 {
+       int ret;
        if (gpfdist_init(argc, argv) == -1)
                gfatal(NULL, "Initialization failed");
-       return gpfdist_run();
+       ret = gpfdist_run();
+       gpfdist_cleanup();
+       return ret;
 }
 
 
@@ -4175,6 +4198,7 @@ int main(int argc, const char* const argv[])
                if (gpfdist_init(argc, argv) == -1)
                        gfatal(NULL, "Initialization failed");
                main_ret = gpfdist_run();
+               gpfdist_cleanup();
        }
 
 
@@ -4264,6 +4288,7 @@ void ServiceMain(int argc, char** argv)
         * actual service work
         */
        gpfdist_run();
+       gpfdist_cleanup();
 }
 
 void ControlHandler(DWORD request)
@@ -4566,7 +4591,7 @@ static void flush_ssl_buffer(int fd, short event, void* 
arg)
 static void setup_flush_ssl_buffer(request_t* r)
 {
        event_del(&r->ev);
-       event_set(&r->ev, r->sock, EV_WRITE, flush_ssl_buffer, r);
+       event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, 
flush_ssl_buffer, r);
        r->tm.tv_sec  = 5;
        r->tm.tv_usec = 0;
        (void)event_add(&r->ev, &r->tm);
@@ -4678,7 +4703,7 @@ static void request_cleanup(request_t *r)
 static void setup_do_close(request_t* r)
 {
        event_del(&r->ev);
-       event_set(&r->ev, r->sock, EV_READ, do_close, r);
+       event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_close, r);
 
        r->tm.tv_sec = 60;
        r->tm.tv_usec = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to