HAWQ-581. Fix Memory Leak in QD to RM communication context
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/9357b318 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/9357b318 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/9357b318 Branch: refs/heads/HAWQ-546 Commit: 9357b318b5cc98171d8fa84771c11cd263f2bd7c Parents: e9f1d95 Author: Wen Lin <w...@pivotal.io> Authored: Fri Mar 25 10:07:10 2016 +0800 Committer: Oleksandr Diachenko <odiache...@pivotal.io> Committed: Wed Mar 30 17:23:30 2016 -0700 ---------------------------------------------------------------------- .../communication/rmcomm_QD2RM.c | 29 +++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9357b318/src/backend/resourcemanager/communication/rmcomm_QD2RM.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c index f08f9d5..4808a8b 100644 --- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c +++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c @@ -202,6 +202,7 @@ uint64_t QD2RM_LastRefreshResourceTime = 0; bool QD2RM_Initialized = false; pthread_t ResourceHeartBeatThreadHandle; +bool ResourceHeartBeatRunning = false; pthread_mutex_t ResourceSetsMutex; uint64_t LastSendResourceRefreshHeartBeatTime = 0; @@ -317,12 +318,14 @@ void initializeQD2RMComm(void) } - /* Start heart-beat thread. */ + /* Start heart-beat thread. */ + ResourceHeartBeatRunning = true; if ( pthread_create(&ResourceHeartBeatThreadHandle, NULL, generateResourceRefreshHeartBeat, tharg) != 0) { + ResourceHeartBeatRunning = false; freeHeartBeatThreadArg(&tharg); elog(ERROR, "failed to create background thread for communication with " "resource manager."); @@ -334,6 +337,8 @@ void initializeQD2RMComm(void) initializeSocketConnectionPool(); QD2RM_Initialized = true; + + on_proc_exit(cleanupQD2RMComm, 0); } int createNewResourceContext(int *index) @@ -473,18 +478,28 @@ int cleanupQD2RMComm(void) res = returnResource(i, errorbuf, sizeof(errorbuf)); if ( res != FUNC_RETURN_OK ) { - elog(WARNING, "%s", errorbuf); - } + elog(WARNING, "%s", errorbuf); + } errorbuf[0] = '\0'; res = unregisterConnectionInRM(i, errorbuf, sizeof(errorbuf)); if ( res != FUNC_RETURN_OK ) { - elog(WARNING, "%s", errorbuf); + elog(WARNING, "%s", errorbuf); } } } } pthread_mutex_unlock(&ResourceSetsMutex); + pthread_mutex_destroy(&ResourceSetsMutex); + + if (ResourceHeartBeatRunning) + { + ResourceHeartBeatRunning = false; + res = pthread_join(ResourceHeartBeatThreadHandle, NULL); + if ( res != FUNC_RETURN_OK ) { + elog(WARNING, "Fail to cancel resource heartbeat thread."); + } + } return FUNC_RETURN_OK; } @@ -1380,14 +1395,12 @@ void *generateResourceRefreshHeartBeat(void *arg) gp_set_thread_sigmasks(); - pthread_detach(pthread_self()); - initializeSelfMaintainBuffer(&sendbuffer, NULL); initializeSelfMaintainBuffer(&contbuffer, NULL); prepareSelfMaintainBuffer(&sendbuffer, DEFAULT_HEARTBEAT_BUFFER, true); prepareSelfMaintainBuffer(&contbuffer, DEFAULT_HEARTBEAT_BUFFER, true); - while( true ) + while( ResourceHeartBeatRunning ) { resetSelfMaintainBuffer(&sendbuffer); resetSelfMaintainBuffer(&contbuffer); @@ -1547,6 +1560,8 @@ void *generateResourceRefreshHeartBeat(void *arg) pg_usleep(rm_session_lease_heartbeat_interval * 1000000L); } + destroySelfMaintainBuffer(&sendbuffer); + destroySelfMaintainBuffer(&contbuffer); freeHeartBeatThreadArg(&tharg); write_log("generateResourceRefreshHeartBeat exits."); return 0;