HAWQ-581. Fix Memory Leak in QD to RM communication context

Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/9357b318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/9357b318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/9357b318

Branch: refs/heads/HAWQ-546
Commit: 9357b318b5cc98171d8fa84771c11cd263f2bd7c
Parents: e9f1d95
Author: Wen Lin <w...@pivotal.io>
Authored: Fri Mar 25 10:07:10 2016 +0800
Committer: Oleksandr Diachenko <odiache...@pivotal.io>
Committed: Wed Mar 30 17:23:30 2016 -0700

----------------------------------------------------------------------
 .../communication/rmcomm_QD2RM.c                | 29 +++++++++++++++-----
 1 file changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9357b318/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c 
b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index f08f9d5..4808a8b 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -202,6 +202,7 @@ uint64_t                    QD2RM_LastRefreshResourceTime = 
0;
 bool                           QD2RM_Initialized                         = 
false;
 
 pthread_t              ResourceHeartBeatThreadHandle;
+bool                           ResourceHeartBeatRunning          = false;
 pthread_mutex_t        ResourceSetsMutex;
 uint64_t               LastSendResourceRefreshHeartBeatTime = 0;
 
@@ -317,12 +318,14 @@ void initializeQD2RMComm(void)
        }
 
 
-       /* Start heart-beat thread. */
+               /* Start heart-beat thread. */
+               ResourceHeartBeatRunning = true;
                if ( pthread_create(&ResourceHeartBeatThreadHandle,
                                                        NULL,
                                                        
generateResourceRefreshHeartBeat,
                                                        tharg) != 0)
                {
+                       ResourceHeartBeatRunning = false;
                        freeHeartBeatThreadArg(&tharg);
                        elog(ERROR, "failed to create background thread for 
communication with "
                                                "resource manager.");
@@ -334,6 +337,8 @@ void initializeQD2RMComm(void)
     initializeSocketConnectionPool();
 
     QD2RM_Initialized = true;
+
+    on_proc_exit(cleanupQD2RMComm, 0);
 }
 
 int createNewResourceContext(int *index)
@@ -473,18 +478,28 @@ int cleanupQD2RMComm(void)
                 res = returnResource(i, errorbuf, sizeof(errorbuf));
                 if ( res != FUNC_RETURN_OK )
                 {
-                       elog(WARNING, "%s", errorbuf);
-               }
+                    elog(WARNING, "%s", errorbuf);
+                }
                 errorbuf[0] = '\0';
                 res = unregisterConnectionInRM(i, errorbuf, sizeof(errorbuf));
                 if ( res != FUNC_RETURN_OK )
                 {
-                       elog(WARNING, "%s", errorbuf);
+                    elog(WARNING, "%s", errorbuf);
                 }
             }
         }
     }
     pthread_mutex_unlock(&ResourceSetsMutex);
+    pthread_mutex_destroy(&ResourceSetsMutex);
+
+    if (ResourceHeartBeatRunning)
+    {
+        ResourceHeartBeatRunning = false;
+        res = pthread_join(ResourceHeartBeatThreadHandle, NULL);
+        if ( res != FUNC_RETURN_OK ) {
+            elog(WARNING, "Fail to cancel resource heartbeat thread.");
+        }
+    }
 
     return FUNC_RETURN_OK;
 }
@@ -1380,14 +1395,12 @@ void *generateResourceRefreshHeartBeat(void *arg)
 
        gp_set_thread_sigmasks();
 
-       pthread_detach(pthread_self());
-
        initializeSelfMaintainBuffer(&sendbuffer, NULL);
        initializeSelfMaintainBuffer(&contbuffer, NULL);
        prepareSelfMaintainBuffer(&sendbuffer, DEFAULT_HEARTBEAT_BUFFER, true);
        prepareSelfMaintainBuffer(&contbuffer, DEFAULT_HEARTBEAT_BUFFER, true);
 
-       while( true )
+       while( ResourceHeartBeatRunning )
        {
                resetSelfMaintainBuffer(&sendbuffer);
                resetSelfMaintainBuffer(&contbuffer);
@@ -1547,6 +1560,8 @@ void *generateResourceRefreshHeartBeat(void *arg)
                pg_usleep(rm_session_lease_heartbeat_interval * 1000000L);
        }
 
+       destroySelfMaintainBuffer(&sendbuffer);
+       destroySelfMaintainBuffer(&contbuffer);
        freeHeartBeatThreadArg(&tharg);
        write_log("generateResourceRefreshHeartBeat exits.");
        return 0;

Reply via email to