HAWQ-364. Make resource manager dynamically adjust minimum YARN container count 
in each segment


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e22956c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e22956c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e22956c6

Branch: refs/heads/HAWQ-369
Commit: e22956c6f84cbeb79ea32615f5f7f9908ffca553
Parents: 898820b
Author: YI JIN <y...@pivotal.io>
Authored: Thu Jan 28 14:37:07 2016 +1100
Committer: YI JIN <y...@pivotal.io>
Committed: Thu Jan 28 14:37:07 2016 +1100

----------------------------------------------------------------------
 .../communication/rmcomm_RM2RMSEG.c             |  2 +
 .../resourcemanager/include/resourcepool.h      |  1 +
 .../resourcemanager/include/resqueuemanager.h   |  6 +-
 src/backend/resourcemanager/requesthandler.c    |  2 +
 .../resourcebroker/resourcebroker_LIBYARN.c     | 20 ++++-
 src/backend/resourcemanager/resourcemanager.c   |  2 +
 src/backend/resourcemanager/resqueuemanager.c   | 79 +++++++++++++++++++-
 7 files changed, 108 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c 
b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index 3591ac1..e6b861b 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -244,6 +244,7 @@ void receivedRUAliveResponse(AsyncCommMessageHandlerContext 
 context,
                                          GET_SEGRESOURCE_HOSTNAME(segres));
 
                        refreshResourceQueueCapacity(false);
+                       refreshActualMinGRMContainerPerSeg();
                }
                else {
                        elog(DEBUG3, "Resource manager find host %s is down 
already.",
@@ -293,6 +294,7 @@ void sentRUAliveError(AsyncCommMessageHandlerContext 
context)
                                  GET_SEGRESOURCE_HOSTNAME(segres));
 
                refreshResourceQueueCapacity(false);
+               refreshActualMinGRMContainerPerSeg();
        }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h 
b/src/backend/resourcemanager/include/resourcepool.h
index d63a6cb..b7b25a1 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -390,6 +390,7 @@ struct ResourcePoolData {
         */
        ResourceBundleData FTSTotal;
        ResourceBundleData GRMTotal;
+       ResourceBundleData GRMTotalHavingNoHAWQNode;
 
     uint64_t LastUpdateTime; /* Last time the GRM cluster report is gotten.   
*/
     uint64_t LastRequestTime;/* Last time the GRM cluster report is sent.     
*/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h 
b/src/backend/resourcemanager/include/resqueuemanager.h
index 0b38520..171b399 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -329,6 +329,8 @@ struct DynResourceQueueManagerData {
     int                                                 
ForcedReturnGRMContainerCount;
     bool                                        toRunQueryDispatch;
     bool                                        
hasResourceProblem[RESPROBLEM_COUNT];
+
+    int                                                 
ActualMinGRMContainerPerSeg;
 };
 typedef struct DynResourceQueueManagerData *DynResourceQueueManager;
 typedef struct DynResourceQueueManagerData  DynResourceQueueManagerData;
@@ -344,8 +346,10 @@ typedef struct DynResourceQueueManagerData  
DynResourceQueueManagerData;
 void initializeResourceQueueManager(void);
 /* collect resource queues' resource usage status from bottom up. */
 void refreshMemoryCoreRatioLevelUsage(uint64_t curmicrosec);
-/* Refresh reosurce queue resource capacity and adjusts all queued requests. */
+/* Refresh resource queue resource capacity and adjusts all queued requests. */
 void refreshResourceQueueCapacity(bool queuechanged);
+/* Refresh actual minimum GRM container water level. */
+void refreshActualMinGRMContainerPerSeg(void);
 /* Dispatch resource to the queuing queries. */
 void dispatchResourceToQueries(void);
 /* Time out the resource allocated whose QD owner does not have chance to 
return. */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c 
b/src/backend/resourcemanager/requesthandler.c
index c6e9a34..cc2a216 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -783,6 +783,7 @@ bool handleRMSEGRequestIMAlive(void **arg)
        {
                /* Refresh resource queue capacities. */
                refreshResourceQueueCapacity(false);
+               refreshActualMinGRMContainerPerSeg();
                /* Recalculate all memory/core ratio instances' limits. */
                refreshMemoryCoreRatioLimits();
                /* Refresh memory/core ratio level water mark. */
@@ -1049,6 +1050,7 @@ bool handleRMRequestSegmentIsDown(void **arg)
        }
 
        refreshResourceQueueCapacity(false);
+       refreshActualMinGRMContainerPerSeg();
 
        RPCResponseSegmentIsDownData response;
        response.Result   = res;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
----------------------------------------------------------------------
diff --git 
a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c 
b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
index c6d26af..c97e340 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
@@ -649,12 +649,21 @@ int handleRB2RM_ClusterReport(void)
        setAllSegResourceGRMUnavailable();
 
        /*
-        * Start to update resource pool content.
+        * Start to update resource pool content. The YARN cluster total size is
+        * also counted the same time.
         */
+
+       resetResourceBundleData(&(PRESPOOL->GRMTotalHavingNoHAWQNode), 0, 0.0, 
0);
+
        MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
        while( list_length(segstats) > 0 )
        {
                SegStat segstat = (SegStat)lfirst(list_head(segstats));
+
+               addResourceBundleData(&(PRESPOOL->GRMTotalHavingNoHAWQNode),
+                                                         
segstat->GRMTotalMemoryMB,
+                                                         
segstat->GRMTotalCore);
+
                res = updateHAWQSegWithGRMSegStat(segstat);
                if ( res == FUNC_RETURN_OK )
                {
@@ -676,6 +685,14 @@ int handleRB2RM_ClusterReport(void)
        }
        MEMORY_CONTEXT_SWITCH_BACK
 
+       elog(LOG, "Resource manager YARN resource broker counted HAWQ cluster 
now "
+                         "having (%d MB, %lf CORE) in a YARN cluster of total 
resource "
+                         "(%d MB, %lf CORE).",
+                         PRESPOOL->GRMTotal.MemoryMB,
+                         PRESPOOL->GRMTotal.Core,
+                         PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB,
+                         PRESPOOL->GRMTotalHavingNoHAWQNode.Core);
+
        /*
         * If the segment is not GRM available, RM should return all containers
         * located upon them.
@@ -695,6 +712,7 @@ int handleRB2RM_ClusterReport(void)
        PQUEMGR->GRMQueueResourceTight  = response.ResourceTight > 0 ? true : 
false;
 
        refreshResourceQueueCapacity(false);
+       refreshActualMinGRMContainerPerSeg();
 
     PRESPOOL->LastUpdateTime = gettime_microsec();
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c 
b/src/backend/resourcemanager/resourcemanager.c
index 819da83..b8a7bb5 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -2645,6 +2645,7 @@ void updateStatusOfAllNodes()
        if ( changedstatus )
        {
                refreshResourceQueueCapacity(false);
+               refreshActualMinGRMContainerPerSeg();
        }
 
        validateResourcePoolStatus(true);
@@ -2808,6 +2809,7 @@ int  loadHostInformationIntoResourcePool(void)
 
        /* Refresh resource queue capacities. */
     refreshResourceQueueCapacity(false);
+    refreshActualMinGRMContainerPerSeg();
        /* Recalculate all memory/core ratio instances' limits. */
        refreshMemoryCoreRatioLimits();
        /* Refresh memory/core ratio level water mark. */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c 
b/src/backend/resourcemanager/resqueuemanager.c
index e2a2f43..10a970b 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -258,6 +258,8 @@ void initializeResourceQueueManager(void)
     {
        PQUEMGR->hasResourceProblem[i] = false;
     }
+
+    PQUEMGR->ActualMinGRMContainerPerSeg = rm_min_resource_perseg;
 }
 
 /*
@@ -2472,6 +2474,77 @@ int returnResourceToResQueMgr(ConnectionTrack conntrack)
        return res;
 }
 
+void refreshActualMinGRMContainerPerSeg(void)
+{
+       
/*--------------------------------------------------------------------------
+        * There are 3 limits should be considered, the actual water level is 
the
+        * least value of the 3 limits : resource queue normal capacity caused 
mean
+        * GRM container number, minimum value of all segments' maximum GRM 
container
+        * numbers, user setting saved in guc.
+        *
+        
*--------------------------------------------------------------------------
+        */
+
+       /* STEP 1. go through each segment to get segment maximum capacity. */
+       int minctncount = INT32_MAX;
+       int normalctncount = INT32_MAX;
+       if ( DRMGlobalInstance->ImpType != NONE_HAWQ2 )
+       {
+               List     *allsegres = NULL;
+               ListCell *cell          = NULL;
+               getAllPAIRRefIntoList(&(PRESPOOL->Segments), &allsegres);
+
+               foreach(cell, allsegres)
+               {
+                       SegResource segres = 
(SegResource)(((PAIR)lfirst(cell))->Value);
+                       if ( !IS_SEGSTAT_FTSAVAILABLE(segres->Stat) ||
+                                !IS_SEGSTAT_GRMAVAILABLE(segres->Stat) )
+                       {
+                               continue;
+                       }
+
+                       if ( segres->Stat->GRMTotalCore < minctncount )
+                       {
+                               minctncount = segres->Stat->GRMTotalCore;
+                       }
+               }
+               freePAIRRefList(&(PRESPOOL->Segments), &allsegres);
+
+               elog(RMLOG, "Resource manager finds minimum global resource 
manager "
+                                       "container count can contained by all 
segments is %d",
+                                       minctncount);
+
+               /* STEP 2. check the queue normal capacity introduced water 
level. */
+               if ( PRESPOOL->AvailNodeCount > 0 &&
+                        PQUEMGR->GRMQueueCapacity > 0 &&
+                        PRESPOOL->GRMTotalHavingNoHAWQNode.Core > 0 )
+               {
+                       normalctncount = 
trunc(PRESPOOL->GRMTotalHavingNoHAWQNode.Core *
+                                                                  
PQUEMGR->GRMQueueCapacity /
+                                                                  
PRESPOOL->AvailNodeCount);
+
+                       elog(RMLOG, "Resource manager calculates normal global 
resource "
+                                               "manager container count based 
on target queue capacity "
+                                               "is %d",
+                                               normalctncount);
+               }
+       }
+
+       /* STEP 3. Get final water level result. */
+       int oldval = PQUEMGR->ActualMinGRMContainerPerSeg;
+       int newval = minctncount < normalctncount ? minctncount : 
normalctncount;
+       newval = newval < rm_min_resource_perseg ? newval : 
rm_min_resource_perseg;
+
+       if ( newval != oldval )
+       {
+               elog(WARNING, "Resource manager adjusts minimum global resource 
manager "
+                                         "container count in each segment from 
%d to %d.",
+                                         oldval,
+                                         newval);
+       }
+       PQUEMGR->ActualMinGRMContainerPerSeg = newval;
+}
+
 void refreshResourceQueueCapacity(bool queuechanged)
 {
        static char errorbuf[ERRORMESSAGE_SIZE];
@@ -2510,8 +2583,10 @@ void refreshResourceQueuePercentageCapacity(bool 
queuechanged)
        {
                if ( DRMGlobalInstance->ImpType == YARN_LIBYARN )
                {
-                       mem  = PRESPOOL->GRMTotal.MemoryMB * 
PQUEMGR->GRMQueueMaxCapacity;
-                       core = PRESPOOL->GRMTotal.Core     * 
PQUEMGR->GRMQueueMaxCapacity;
+                       mem  = PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB *
+                                  PQUEMGR->GRMQueueMaxCapacity;
+                       core = PRESPOOL->GRMTotalHavingNoHAWQNode.Core     *
+                                  PQUEMGR->GRMQueueMaxCapacity;
                }
                else if ( DRMGlobalInstance->ImpType == NONE_HAWQ2 )
                {

Reply via email to