Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 9dd09659a -> cffcb998a


Problem: Block distribution is wrong in case of dynamic allocation=true

Analysis: In case when dynamic allocation is true and configured max executors 
are more than the initial executors then carbon is not able to request the max 
number of executors configured. Due to this resources are getting under 
utilized and case when number of blocks increases, the distribution of blocks 
is limited to the number of nodes and the number of tasks launched are less. 
This leads to under utilization of resources and hence impacts the query and 
load performance.

Fix: Request for starting the maximum number of configured executors in case 
dynamic allocation is true.

Impact area: Query and data load flow performance due to under utilization of 
resources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ff7793be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ff7793be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ff7793be

Branch: refs/heads/master
Commit: ff7793beb079847a57a5d3d5b33a37c1976e53fb
Parents: 9dd0965
Author: manishgupta88 <tomanishgupt...@gmail.com>
Authored: Mon Nov 28 15:37:11 2016 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Sat Dec 3 02:37:28 2016 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  23 +++
 .../carbondata/core/util/CarbonProperties.java  |  23 +++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   7 +-
 .../spark/sql/hive/DistributionUtil.scala       | 155 ++++++++++++++++---
 .../org/apache/spark/sql/CarbonContext.scala    |   2 -
 5 files changed, 181 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1ac2ba1..29dbbd8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -919,6 +919,29 @@ public final class CarbonCommonConstants {
    * maximum length of column
    */
   public static final int DEFAULT_COLUMN_LENGTH = 100000;
+  /**
+   * Maximum waiting time (in seconds) for a query for requested executors to 
be started
+   */
+  public static final String CARBON_EXECUTOR_STARTUP_TIMEOUT =
+      "carbon.max.executor.startup.timeout";
+
+  /**
+   * default value for executor start up waiting time out
+   */
+  public static final String CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT = "5";
+
+  /**
+   * Max value. If value configured by user is more than this than this value 
will value will be
+   * considered
+   */
+  public static final int CARBON_EXECUTOR_WAITING_TIMEOUT_MAX = 60;
+
+  /**
+   * time for which thread will sleep and check again if the requested number 
of executors
+   * have been started
+   */
+  public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250;
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index f4ec63d..3657215 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -85,6 +85,7 @@ public final class CarbonProperties {
     validateHighCardinalityThreshold();
     validateHighCardinalityInRowCountPercentage();
     validateCarbonDataFileVersion();
+    validateExecutorStartUpTime();
   }
 
   private void validateBadRecordsLocation() {
@@ -539,4 +540,26 @@ public final class CarbonProperties {
     return defaultVal;
   }
 
+  /**
+   * This method will validate and set the value for executor start up waiting 
time out
+   */
+  private void validateExecutorStartUpTime() {
+    int executorStartUpTimeOut = 0;
+    try {
+      executorStartUpTimeOut = Integer.parseInt(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT,
+              CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT));
+      // If value configured by user is more than max value of time out then 
consider the max value
+      if (executorStartUpTimeOut > 
CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX) {
+        executorStartUpTimeOut = 
CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX;
+      }
+    } catch (NumberFormatException ne) {
+      executorStartUpTimeOut =
+          
Integer.parseInt(CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT);
+    }
+    
carbonProperties.setProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT,
+        String.valueOf(executorStartUpTimeOut));
+    LOGGER.info("Executor start up wait time: " + executorStartUpTimeOut);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 93e1590..ccaf9e3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -263,15 +263,16 @@ class CarbonMergerRDD[K, V](
       carbonMergerMapping.maxSegmentColumnSchemaList = 
dataFileFooter.getColumnInTable.asScala
         .toList
     }
+
+    val blocks = carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava
     // send complete list of blocks to the mapping util.
-    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(
-      carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1)
+    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(blocks, -1)
 
     val confExecutors = confExecutorsTemp.toInt
     val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
       confExecutors
     } else { nodeBlockMapping.size() }
-    DistributionUtil.ensureExecutors(sparkContext, requiredExecutors)
+    DistributionUtil.ensureExecutors(sparkContext, requiredExecutors, 
blocks.size)
     logInfo("No.of Executors required=" + requiredExecutors +
             " , spark.executor.instances=" + confExecutors +
             ", no.of.nodes where data present=" + nodeBlockMapping.size())

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 7368bff..8b1a2bb 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -26,6 +26,8 @@ import 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
 object DistributionUtil {
@@ -99,66 +101,171 @@ object DistributionUtil {
    * Checking if the existing executors is greater than configured executors, 
if yes
    * returning configured executors.
    *
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
    * @param sparkContext
    * @return
    */
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
       sparkContext: SparkContext): Seq[String] = {
-    val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
-    ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext)
+    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
+    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
   }
 
   def ensureExecutorsByNumberAndGetNodeList(nodesOfData: Int,
       sparkContext: SparkContext): Seq[String] = {
-    var confExecutorsTemp: String = null
-    if (sparkContext.getConf.contains("spark.executor.instances")) {
-      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
-    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-               && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
-                 .equalsIgnoreCase("true")) {
-      if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-        confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
-      }
+    val confExecutors: Int = getConfiguredExecutors(sparkContext)
+    LOGGER.info(s"Executors configured : $confExecutors")
+    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
+      confExecutors
+    } else {
+      nodesOfData
     }
+    // request for starting the number of required executors
+    ensureExecutors(sparkContext, requiredExecutors)
+    getDistinctNodesList(sparkContext, requiredExecutors)
+  }
 
-    val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt 
else 1
+  /**
+   * This method will ensure that the required/configured number of executors 
are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(
+      nodeMapping: java.util.Map[String, java.util.List[Distributable]],
+      blockList: Seq[Distributable],
+      sparkContext: SparkContext): Seq[String] = {
+    val nodesOfData = nodeMapping.size()
+    val confExecutors: Int = getConfiguredExecutors(sparkContext)
+    LOGGER.info(s"Executors configured : $confExecutors")
     val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
       confExecutors
+    } else if (confExecutors > nodesOfData) {
+      var totalExecutorsToBeRequested = nodesOfData
+      // If total number of blocks are greater than the nodes identified then 
ensure
+      // that the configured number of max executors can be opened based on 
the difference of
+      // block list size and nodes identified
+      if (blockList.size > nodesOfData) {
+        // e.g 1. blockList size = 40, confExecutors = 6, then all 6 executors
+        // need to be opened
+        // 2. blockList size = 4, confExecutors = 6, then
+        // total 4 executors need to be opened
+        if (blockList.size > confExecutors) {
+          totalExecutorsToBeRequested = confExecutors
+        } else {
+          totalExecutorsToBeRequested = blockList.size
+        }
+      }
+      LOGGER.info(s"Total executors requested: $totalExecutorsToBeRequested")
+      totalExecutorsToBeRequested
     } else {
       nodesOfData
     }
+    // request for starting the number of required executors
+    ensureExecutors(sparkContext, requiredExecutors, blockList.size)
+    getDistinctNodesList(sparkContext, requiredExecutors)
+  }
+
+  /**
+   * This method will return the configured executors
+   *
+   * @param sparkContext
+   * @return
+   */
+  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+    var confExecutors: Int = 0
+    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+      // default value for spark.dynamicAllocation.maxExecutors is infinity
+      confExecutors = 
sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
+    } else {
+      // default value for spark.executor.instances is 2
+      confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 
1)
+    }
+    confExecutors
+  }
 
+  /**
+   * This method will return the distinct nodes list
+   *
+   * @param sparkContext
+   * @param requiredExecutors
+   * @return
+   */
+  private def getDistinctNodesList(sparkContext: SparkContext,
+      requiredExecutors: Int): Seq[String] = {
     val startTime = System.currentTimeMillis()
-    ensureExecutors(sparkContext, requiredExecutors)
     var nodes = DistributionUtil.getNodeList(sparkContext)
-    var maxTimes = 30
+    // calculate the number of times loop has to run to check for starting
+    // the requested number of executors
+    val threadSleepTime = 
CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME
+    val loopCounter = 
calculateCounterBasedOnExecutorStartupTime(threadSleepTime)
+    var maxTimes = loopCounter
     while (nodes.length < requiredExecutors && maxTimes > 0) {
-      Thread.sleep(500)
+      Thread.sleep(threadSleepTime)
       nodes = DistributionUtil.getNodeList(sparkContext)
       maxTimes = maxTimes - 1
     }
     val timDiff = System.currentTimeMillis() - startTime
-    LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
-    LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - 
maxTimes) * 500 }")
-    nodes.distinct
+    LOGGER.info(s"Total Time taken to ensure the required executors : 
$timDiff")
+    LOGGER.info(s"Time elapsed to allocate the required executors: " +
+      s"${(loopCounter - maxTimes) * threadSleepTime}")
+    nodes.distinct.toSeq
   }
 
   // Hack for spark2 integration
   var numExistingExecutors: Int = _
 
-  def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
+  /**
+   * Requesting the extra executors other than the existing ones.
+   *
+   * @param sc                   sparkContext
+   * @param requiredExecutors         required number of executors to be 
requested
+   * @param localityAwareTasks   The number of pending tasks which is locality 
required
+   * @param hostToLocalTaskCount A map to store hostname with its possible 
task number running on it
+   * @return
+   */
+  def ensureExecutors(sc: SparkContext,
+      requiredExecutors: Int,
+      localityAwareTasks: Int = 0,
+      hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean = {
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        val requiredExecutors = numExecutors - numExistingExecutors
-        LOGGER.info(s"number of executors is = $numExecutors existing 
executors are = " +
-                    s"$numExistingExecutors")
+        LOGGER
+          .info(
+            s"number of required executors are = $requiredExecutors and 
existing executors are = " +
+            s"$numExistingExecutors")
         if (requiredExecutors > 0) {
-          b.requestExecutors(requiredExecutors)
+          LOGGER
+            .info(s"Requesting total executors: $requiredExecutors")
+          b.requestTotalExecutors(requiredExecutors, localityAwareTasks, 
hostToLocalTaskCount)
         }
         true
       case _ =>
         false
     }
   }
+
+  /**
+   * This method will calculate how many times a loop will run with an 
interval of given sleep
+   * time to wait for requested executors to come up
+   *
+   * @param threadSleepTime
+   * @return
+   */
+  private def calculateCounterBasedOnExecutorStartupTime(threadSleepTime: 
Int): Int = {
+    var executorStartUpTimeOut = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT,
+        CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT).toInt
+    // convert seconds into milliseconds for loop counter calculation
+    executorStartUpTimeOut = executorStartUpTimeOut * 1000
+    // make executor start up time exactly divisible by thread sleep time
+    val remainder = executorStartUpTimeOut % threadSleepTime
+    if (remainder > 0) {
+      executorStartUpTimeOut = executorStartUpTimeOut + threadSleepTime - 
remainder
+    }
+    executorStartUpTimeOut / threadSleepTime
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ff7793be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index e5d3d6b..efaa57b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -22,7 +22,6 @@ import java.io.File
 import scala.language.implicitConversions
 
 import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
@@ -183,5 +182,4 @@ object CarbonContext {
     }
     cache(sc) = cc
   }
-
 }

Reply via email to