Repository: incubator-carbondata Updated Branches: refs/heads/master 9dd09659a -> cffcb998a
Problem: Block distribution is wrong in case of dynamic allocation=true Analysis: In case when dynamic allocation is true and configured max executors are more than the initial executors then carbon is not able to request the max number of executors configured. Due to this resources are getting under utilized and case when number of blocks increases, the distribution of blocks is limited to the number of nodes and the number of tasks launched are less. This leads to under utilization of resources and hence impacts the query and load performance. Fix: Request for starting the maximum number of configured executors in case dynamic allocation is true. Impact area: Query and data load flow performance due to under utilization of resources. Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ff7793be Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ff7793be Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ff7793be Branch: refs/heads/master Commit: ff7793beb079847a57a5d3d5b33a37c1976e53fb Parents: 9dd0965 Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Mon Nov 28 15:37:11 2016 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Sat Dec 3 02:37:28 2016 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 23 +++ .../carbondata/core/util/CarbonProperties.java | 23 +++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 7 +- .../spark/sql/hive/DistributionUtil.scala | 155 ++++++++++++++++--- .../org/apache/spark/sql/CarbonContext.scala | 2 - 5 files changed, 181 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 1ac2ba1..29dbbd8 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -919,6 +919,29 @@ public final class CarbonCommonConstants { * maximum length of column */ public static final int DEFAULT_COLUMN_LENGTH = 100000; + /** + * Maximum waiting time (in seconds) for a query for requested executors to be started + */ + public static final String CARBON_EXECUTOR_STARTUP_TIMEOUT = + "carbon.max.executor.startup.timeout"; + + /** + * default value for executor start up waiting time out + */ + public static final String CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT = "5"; + + /** + * Max value. If value configured by user is more than this than this value will value will be + * considered + */ + public static final int CARBON_EXECUTOR_WAITING_TIMEOUT_MAX = 60; + + /** + * time for which thread will sleep and check again if the requested number of executors + * have been started + */ + public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index f4ec63d..3657215 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -85,6 +85,7 @@ public final class CarbonProperties { validateHighCardinalityThreshold(); validateHighCardinalityInRowCountPercentage(); validateCarbonDataFileVersion(); + validateExecutorStartUpTime(); } private void validateBadRecordsLocation() { @@ -539,4 +540,26 @@ public final class CarbonProperties { return defaultVal; } + /** + * This method will validate and set the value for executor start up waiting time out + */ + private void validateExecutorStartUpTime() { + int executorStartUpTimeOut = 0; + try { + executorStartUpTimeOut = Integer.parseInt(carbonProperties + .getProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT, + CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT)); + // If value configured by user is more than max value of time out then consider the max value + if (executorStartUpTimeOut > CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX) { + executorStartUpTimeOut = CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX; + } + } catch (NumberFormatException ne) { + executorStartUpTimeOut = + Integer.parseInt(CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT); + } + carbonProperties.setProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT, + String.valueOf(executorStartUpTimeOut)); + LOGGER.info("Executor start up wait time: " + executorStartUpTimeOut); + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 93e1590..ccaf9e3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -263,15 +263,16 @@ class CarbonMergerRDD[K, V]( carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala .toList } + + val blocks = carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava // send complete list of blocks to the mapping util. - nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping( - carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1) + nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(blocks, -1) val confExecutors = confExecutorsTemp.toInt val requiredExecutors = if (nodeBlockMapping.size > confExecutors) { confExecutors } else { nodeBlockMapping.size() } - DistributionUtil.ensureExecutors(sparkContext, requiredExecutors) + DistributionUtil.ensureExecutors(sparkContext, requiredExecutors, blocks.size) logInfo("No.of Executors required=" + requiredExecutors + " , spark.executor.instances=" + confExecutors + ", no.of.nodes where data present=" + nodeBlockMapping.size()) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala index 7368bff..8b1a2bb 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala @@ -26,6 +26,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.datastore.block.Distributable +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.load.CarbonLoaderUtil object DistributionUtil { @@ -99,66 +101,171 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { - val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava) - ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext) + val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) + ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) } def ensureExecutorsByNumberAndGetNodeList(nodesOfData: Int, sparkContext: SparkContext): Seq[String] = { - var confExecutorsTemp: String = null - if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") - } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { - confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") - } + val confExecutors: Int = getConfiguredExecutors(sparkContext) + LOGGER.info(s"Executors configured : $confExecutors") + val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors + } else { + nodesOfData } + // request for starting the number of required executors + ensureExecutors(sparkContext, requiredExecutors) + getDistinctNodesList(sparkContext, requiredExecutors) + } - val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1 + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList( + nodeMapping: java.util.Map[String, java.util.List[Distributable]], + blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { + val nodesOfData = nodeMapping.size() + val confExecutors: Int = getConfiguredExecutors(sparkContext) + LOGGER.info(s"Executors configured : $confExecutors") val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { confExecutors + } else if (confExecutors > nodesOfData) { + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { + // e.g 1. blockList size = 40, confExecutors = 6, then all 6 executors + // need to be opened + // 2. blockList size = 4, confExecutors = 6, then + // total 4 executors need to be opened + if (blockList.size > confExecutors) { + totalExecutorsToBeRequested = confExecutors + } else { + totalExecutorsToBeRequested = blockList.size + } + } + LOGGER.info(s"Total executors requested: $totalExecutorsToBeRequested") + totalExecutorsToBeRequested } else { nodesOfData } + // request for starting the number of required executors + ensureExecutors(sparkContext, requiredExecutors, blockList.size) + getDistinctNodesList(sparkContext, requiredExecutors) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { + var confExecutors: Int = 0 + if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) + } else { + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1) + } + confExecutors + } + /** + * This method will return the distinct nodes list + * + * @param sparkContext + * @param requiredExecutors + * @return + */ + private def getDistinctNodesList(sparkContext: SparkContext, + requiredExecutors: Int): Seq[String] = { val startTime = System.currentTimeMillis() - ensureExecutors(sparkContext, requiredExecutors) var nodes = DistributionUtil.getNodeList(sparkContext) - var maxTimes = 30 + // calculate the number of times loop has to run to check for starting + // the requested number of executors + val threadSleepTime = CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME + val loopCounter = calculateCounterBasedOnExecutorStartupTime(threadSleepTime) + var maxTimes = loopCounter while (nodes.length < requiredExecutors && maxTimes > 0) { - Thread.sleep(500) + Thread.sleep(threadSleepTime) nodes = DistributionUtil.getNodeList(sparkContext) maxTimes = maxTimes - 1 } val timDiff = System.currentTimeMillis() - startTime - LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff") - LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }") - nodes.distinct + LOGGER.info(s"Total Time taken to ensure the required executors : $timDiff") + LOGGER.info(s"Time elapsed to allocate the required executors: " + + s"${(loopCounter - maxTimes) * threadSleepTime}") + nodes.distinct.toSeq } // Hack for spark2 integration var numExistingExecutors: Int = _ - def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = { + /** + * Requesting the extra executors other than the existing ones. + * + * @param sc sparkContext + * @param requiredExecutors required number of executors to be requested + * @param localityAwareTasks The number of pending tasks which is locality required + * @param hostToLocalTaskCount A map to store hostname with its possible task number running on it + * @return + */ + def ensureExecutors(sc: SparkContext, + requiredExecutors: Int, + localityAwareTasks: Int = 0, + hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean = { sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => - val requiredExecutors = numExecutors - numExistingExecutors - LOGGER.info(s"number of executors is = $numExecutors existing executors are = " + - s"$numExistingExecutors") + LOGGER + .info( + s"number of required executors are = $requiredExecutors and existing executors are = " + + s"$numExistingExecutors") if (requiredExecutors > 0) { - b.requestExecutors(requiredExecutors) + LOGGER + .info(s"Requesting total executors: $requiredExecutors") + b.requestTotalExecutors(requiredExecutors, localityAwareTasks, hostToLocalTaskCount) } true case _ => false } } + + /** + * This method will calculate how many times a loop will run with an interval of given sleep + * time to wait for requested executors to come up + * + * @param threadSleepTime + * @return + */ + private def calculateCounterBasedOnExecutorStartupTime(threadSleepTime: Int): Int = { + var executorStartUpTimeOut = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT, + CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT).toInt + // convert seconds into milliseconds for loop counter calculation + executorStartUpTimeOut = executorStartUpTimeOut * 1000 + // make executor start up time exactly divisible by thread sleep time + val remainder = executorStartUpTimeOut % threadSleepTime + if (remainder > 0) { + executorStartUpTimeOut = executorStartUpTimeOut + threadSleepTime - remainder + } + executorStartUpTimeOut / threadSleepTime + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala index e5d3d6b..efaa57b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala @@ -22,7 +22,6 @@ import java.io.File import scala.language.implicitConversions import org.apache.spark.SparkContext -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer @@ -183,5 +182,4 @@ object CarbonContext { } cache(sc) = cc } - }