----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/53801/ -----------------------------------------------------------
(Updated Nov. 18, 2016, 2:34 a.m.) Review request for Ambari, Alejandro Fernandez, Madhuvanthi Radhakrishnan, Sumit Mohanty, and Siddharth Seth. Changes ------- - Updated the Ambari Upgrade code for hive configs which needs to be set to default values. - Added few more suggestiosn as per discussion with Sid Seth on LLAP calculations. Bugs: AMBARI-18901 https://issues.apache.org/jira/browse/AMBARI-18901 Repository: ambari Description ------- AMBARI-18901. Use 'Number of LLAP Nodes' selected as the driver for LLAP config calculations. Below is the calculation logic used: ********************************************************************************************************************************************** ********************************************************************************************************************************************** ----------------------------------------------------------- For use with default setup - Ambari managed queue Parameteres numRequestedLlapNodes UserInput tezAmContainerSize Computed memoryPerThread Computed // Set as parameter in HiveConf. user can override in Advanced numConcurrentQueries Computed // user can override in Advanced maxConcurrentQueries Computed // Max value for Slider numLlapNodes Computed // Can be lower for small clusters. user can override in Advanced sliderAmContainerSize Computed // user can override in Advanced numExecutorsPerNode Computed // user can override in Advanced | TODO: This and memPerThread are about the same when taking daemonSize, and cache into considertaion cacheMemoryPerNode Computed // user can override in Advanced amFraction 1 // Set to 1 for Ambari controlled queued. | TODO Factor into concurrency for user provided queues llapQueueFraction Computed // Computed by Ambari. (Avoid changing if the current value is > computed value, and the user specified the current value?) numClusterNodes ClusterParameter nmMemoryPerNode ClusterParameter nmCpusPerNode ClusterParameter minContainerSize ClusterParameter CONSTANT DEFAULT_EXECUTOR_TO_AM_RATIO = 20; CONSTANT MIN_EXECUTOR_TO_AM_RATIO = 10; CONSTANT MAX_CONCURRENT_QUERIES = 32; nmMemoryPerNodeNormalized = normalizeDown(nmMemoryPerNode, minContainerSize); totalClusterCapacity = numClusterNodes * nmMemoryPerNodeNormalized; totalllapMemory = numRequestedLlapNodes * nmMemoryPerNodeNormalized; amCapacityAvailable = totalLlapMemory; // For the LLAP queue - the AM fraction is set to 1 sliderAmSize -> Current calculations remain unchanged. (<=, < fixes) llapMemoryTezAMsAndDaemons = totalllapMemory - sliderAmSize; FAIL("Not enough capacity available on the cluster to run LLAP") if (llapMemoryTezAMsAndDaemons < 2 * minContainerSize); tezAmContainerSize = (totalClusterCapacity, minContainerSize) { desiredSize = { // This part is unchanged from current calculations. if (totalClusterCapacity <= 4096) { return 256; } else if (totalClusterCapacity <= 73728) { return 512; } else { return 1536; } } return normalizeUp(desiredSize, minContainerSize); } memoryPerThread = (nmMemoryPerNodeNormalized, nmCpusPerNode) { // TODO: Not linear. e.g. 1024 to 1025 goes from 2 executors to 1. if (userSpecifiedValue) { return userSpecifiedValue; } else if (nmMemoryPerNodeNormalized <= 1024) { return Math.min(512, nmMemoryPerNodeNormalized) } else if (nmMemoryPerNodeNormalized <= 4096 ) { return 1024; } else if (nmMemoryPerNodeNormalized <= 10240) { return 2048; } else if (nmMemoryPerNodeNormalized <= 24576) { return 3072; } else { return 4096; } } numConcurrentQueries, maxConcurrentQueries = (nmMemoryPerNodeNormalized, nmCpusPerNode, memoryPerThread, numRequestedLlapNodes, llapMemoryTezAMsAndDaemons, tezAmContainerSize, amCapacityAvailable) { maxExecutorsPerNode = getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode, memoryPerThread); FAIL if maxExecutorsPerNode < 1; // Default 1 AM for every 20 executor threads. // The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM, making use of total memory. However, it's possible // that total memory will not be used - and the numExecutors is instead limited by #CPUs. Use maxPerNode to factor this in. numConcurrentQueriesLimit = Math.min(floor(maxExecutorsPerNode * numRequestedLlapNodes / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES); numConcurrentQueries = Math.min(numConcurrentQueriesLimit, floor(llapMemoryTezAMsAndDaemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * memoryPerThread + tezAmContainerSize))); if (numConcurrentQueries == 0) { numConcurrentQueries = 1; } if (numConcurrentQueries * tezAmContainerSize > amCapacityAvailable) { numConcurrentQueries = floor(amCapacityAvailable / tezAmContainerSize); FAIL if numConcurrentQueries < 1; } maxConcurrentQueriesLimit = Math.min(floor(maxExecutorsPerNode * numRequestedLlapNodes / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) ; maxConcurrentQueries = Math.min(floor(llapMemoryTezAMsAndDaemons / (MIN_EXECUTOR_TO_AM_RATIO * memoryPerThread + tezAmContainerSize)); if (maxConcurrentQueries == 0) { maxConcurrentQueries = 1; } if (maxConcurrentQueries * tezAmContainerSize > amCapacityAvailable) { maxConcurrentQueries = floor(amCapacityAvailable / tezAmContainerSize); FAIL if maxConcurrentQueries < 1; } } numLlapNodes, daemonMemoryPerNode, numExecutorsPerNode, executorMemoryPerNode, cacheMemoryPerNode = (desiredConcurrency, tezAmContainerSize, llapMemoryTezAMsAndDaemons, minContainerSize, numRequestedLlapNodes, memoryPerThread, numRequestedLlapNodes) { amMemoryRequired = desiredConcurrency * tezAmContainerSize; llapMemoryDaemons = llapMemoryTezAMsAndDaemons - amMemoryRequired; FAIL if llapMemoryDaemons < minContainerSize; FAIL("Not enough memory available for executors") if (llapMemoryDaemons < memoryPerThread || llapMemoryDaemons < minContainerSize); daemonMemoryPerNode = normalizeDown(llapMemoryDaemons / numRequestedLlapNodes, minContainerSize); if (daemonMemoryPerNode == 0) { // Small cluster. No capacity left on a node after running AMs. daemonMemoryPerNode = memoryPerThread; numLlapNodes = floor(llapMemoryDaemons / memoryPerThread) } else if (daemonMemoeryPerNode < memoryPerThread) { // Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node) daemonMemoryPerNode = memoryPerThread; numLlapNodes = floor(llapMemoryDaemons / memoryPerThread); } else { // All good. We have a proper value for memoryPerNode. numLlapNodes = numRequestedLlapNodes } maxExecutorsPerNode = getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode, memoryPerThread); FAIL if maxExecutorsPerNode < 1; // NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem. numExecutorsPerNode = Math.min(floor(daemonMemoryPerNode / memoryPerThread), maxExecutorsPerNode) // Now figure out how much of the memory will be used by the executors, and how much will be used by the cache. executorMemoryPerNode = numExecutorsPerNode * memoryPerThread; cacheMemoryPerNode = daemonMemoryPerNode - executorMemoryPerNode; Assert numExecutorsPerNode > 0 return (numLlapNodes, daemonMemoryPerNode, numExecutorsPerNode, executorMemoryPerNode, cacheMemoryPerNode); } llapQueueFracation = numRequestedLlapNodes / numClusterNodes def getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode) { // TODO Special case VMs: 1 CPU doesn't necessarily mean 1 executor. return nmCpusPerNode; } def getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode, memoryPerThread) { // This potentially takes up the entire node leaving no space for AMs. return Math.min(floor(nmMemoryPerNodeNormalized / memoryPerThread), getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode)); } ---------------------------------------------------------- For User Provided Queues queueFraction User Provided (from capacity-scheduler.xml) queueAmFraction User Provided (from capacity-scheduler.xml) numRequestedLlapNodes = floor(queueFraction * numClusterNodes) amCapacityAvailable = queueAmFraction * queueFraction * totalClusterCapacity) Rest of the calcaultions remain unchanged ---------------------------------------------------------- ********************************************************************************************************************************************** ********************************************************************************************************************************************** Diffs (updated) ----- ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java e81568c ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/configuration/hive-interactive-env.xml 1fd72eb ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/configuration/hive-interactive-site.xml 0207e49 ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/configuration/tez-interactive-site.xml 9e588e9 ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/themes/theme.json 452537d ambari-server/src/main/resources/stacks/HDP/2.5/services/YARN/configuration/capacity-scheduler.xml 9ff8484 ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py dfb7b0c ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java 14fc20b Diff: https://reviews.apache.org/r/53801/diff/ Testing ------- Manual Testing on deployed cluster. Thanks, Swapan Shridhar