[CARBONDATA-2043] Configurable wait time for requesting executors and minimum registered executors ratio to continue the block distribution - carbon.dynamicAllocation.schedulerTimeout : to configure wait time. defalt 5sec, Min 5 sec and max 15 sec. - carbon.scheduler.minRegisteredResourcesRatio : min 0.1, max 1.0 and default to 0.8 to configure minimum registered executors ratio.
This closes #1822 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/473bd319 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/473bd319 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/473bd319 Branch: refs/heads/branch-1.3 Commit: 473bd3197a69e3c0574f8c07f04c29e43f7a023d Parents: 54a381c Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Fri Dec 22 17:30:31 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Fri Feb 2 11:10:23 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 71 ++++++++++----- .../carbondata/core/util/CarbonProperties.java | 90 +++++++++++++++----- .../core/CarbonPropertiesValidationTest.java | 42 +++++++++ .../spark/sql/hive/DistributionUtil.scala | 67 ++++++++++----- 4 files changed, 205 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 7ae3034..87eec8a 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1149,29 +1149,6 @@ public final class CarbonCommonConstants { */ public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS = 20000; - /** - * Maximum waiting time (in seconds) for a query for requested executors to be started - */ - @CarbonProperty - public static final String CARBON_EXECUTOR_STARTUP_TIMEOUT = - "carbon.max.executor.startup.timeout"; - - /** - * default value for executor start up waiting time out - */ - public static final String CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT = "5"; - - /** - * Max value. If value configured by user is more than this than this value will value will be - * considered - */ - public static final int CARBON_EXECUTOR_WAITING_TIMEOUT_MAX = 60; - - /** - * time for which thread will sleep and check again if the requested number of executors - * have been started - */ - public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250; /** * to enable unsafe column page in write step @@ -1537,6 +1514,54 @@ public final class CarbonCommonConstants { public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024; /** + * minimum required registered resource for starting block distribution + */ + @CarbonProperty + public static final String CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO = + "carbon.scheduler.minregisteredresourcesratio"; + /** + * default minimum required registered resource for starting block distribution + */ + public static final String CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT = "0.8d"; + /** + * minimum required registered resource for starting block distribution + */ + public static final double CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN = 0.1d; + /** + * max minimum required registered resource for starting block distribution + */ + public static final double CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX = 1.0d; + + /** + * To define how much time scheduler should wait for the + * resource in dynamic allocation. + */ + public static final String CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT = + "carbon.dynamicallocation.schedulertimeout"; + + /** + * default scheduler wait time + */ + public static final String CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT = "5"; + + /** + * default value for executor start up waiting time out + */ + public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MIN = 5; + + /** + * Max value. If value configured by user is more than this than this value will value will be + * considered + */ + public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MAX = 15; + + /** + * time for which thread will sleep and check again if the requested number of executors + * have been started + */ + public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME = 250; + + /** * It allows queries on hive metastore directly along with filter information, otherwise first * fetches all partitions from hive and apply filters on it. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index fd78efc..39a0b80 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -35,7 +35,33 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.*; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.BLOCKLET_SIZE; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATA_FILE_VERSION; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATE_FORMAT; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCK; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_VECTOR_READER; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.HANDOFF_SIZE; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.LOCK_TYPE; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES_BLOCK_SORT; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_SIZE; import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB; import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO; @@ -106,8 +132,8 @@ public final class CarbonProperties { case CARBON_DATA_FILE_VERSION: validateCarbonDataFileVersion(); break; - case CARBON_EXECUTOR_STARTUP_TIMEOUT: - validateExecutorStartUpTime(); + case CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT: + validateDynamicSchedulerTimeOut(); break; case CARBON_PREFETCH_BUFFERSIZE: validatePrefetchBufferSize(); @@ -156,6 +182,9 @@ public final class CarbonProperties { case ENABLE_AUTO_HANDOFF: validateHandoffSize(); break; + case CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO: + validateSchedulerMinRegisteredRatio(); + break; // TODO : Validation for carbon.lock.type should be handled for addProperty flow default: // none @@ -171,7 +200,7 @@ public final class CarbonProperties { validateNumCoresBlockSort(); validateSortSize(); validateCarbonDataFileVersion(); - validateExecutorStartUpTime(); + validateDynamicSchedulerTimeOut(); validatePrefetchBufferSize(); validateBlockletGroupSizeInMB(); validateNumberOfColumnPerIORead(); @@ -193,6 +222,7 @@ public final class CarbonProperties { validateSortFileWriteBufferSize(); validateSortIntermediateFilesLimit(); validateEnableAutoHandoff(); + validateSchedulerMinRegisteredRatio(); } /** @@ -253,6 +283,36 @@ public final class CarbonProperties { } /** + * minimum required registered resource for starting block distribution + */ + private void validateSchedulerMinRegisteredRatio() { + String value = carbonProperties + .getProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT); + try { + double minRegisteredResourceRatio = java.lang.Double.parseDouble(value); + if (minRegisteredResourceRatio < CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN + || minRegisteredResourceRatio > CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX) { + LOGGER.warn("The value \"" + value + + "\" configured for key " + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO + + "\" is not in range. Valid range is (byte) \"" + + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN + " to \"" + + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX + ". Using the default value \"" + + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT); + carbonProperties.setProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT); + } + } catch (NumberFormatException e) { + LOGGER.warn("The value \"" + value + + "\" configured for key " + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO + + "\" is invalid. Using the default value \"" + + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT); + carbonProperties.setProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT); + } + } + + /** * The method validate the validity of configured carbon.date.format value * and reset to default value if validation fail */ @@ -984,23 +1044,11 @@ public final class CarbonProperties { /** * This method will validate and set the value for executor start up waiting time out */ - private void validateExecutorStartUpTime() { - int executorStartUpTimeOut = 0; - try { - executorStartUpTimeOut = Integer.parseInt(carbonProperties - .getProperty(CARBON_EXECUTOR_STARTUP_TIMEOUT, - CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT)); - // If value configured by user is more than max value of time out then consider the max value - if (executorStartUpTimeOut > CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX) { - executorStartUpTimeOut = CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX; - } - } catch (NumberFormatException ne) { - executorStartUpTimeOut = - Integer.parseInt(CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT); - } - carbonProperties.setProperty(CARBON_EXECUTOR_STARTUP_TIMEOUT, - String.valueOf(executorStartUpTimeOut)); - LOGGER.info("Executor start up wait time: " + executorStartUpTimeOut); + private void validateDynamicSchedulerTimeOut() { + validateRange(CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, + CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT, + CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MIN, + CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MAX); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java index daf6db0..bbfe26c 100644 --- a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java +++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java @@ -205,4 +205,46 @@ public class CarbonPropertiesValidationTest extends TestCase { assertTrue(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE .equalsIgnoreCase(valueAfterValidation)); } + + @Test public void testValidateDynamicSchedulerTimeOut() { + carbonProperties + .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "2"); + String valueAfterValidation = carbonProperties + .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT); + assertTrue(valueAfterValidation + .equals(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT)); + carbonProperties + .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "16"); + valueAfterValidation = carbonProperties + .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT); + assertTrue(valueAfterValidation + .equals(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT)); + carbonProperties + .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "15"); + valueAfterValidation = carbonProperties + .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT); + assertTrue(valueAfterValidation + .equals("15")); + + } + @Test public void testValidateSchedulerMinRegisteredRatio() { + carbonProperties + .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "0.0"); + String valueAfterValidation = carbonProperties + .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO); + assertTrue(valueAfterValidation + .equals(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT)); + carbonProperties + .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "-0.1"); + valueAfterValidation = carbonProperties + .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO); + assertTrue(valueAfterValidation + .equals(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT)); + carbonProperties + .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "0.1"); + valueAfterValidation = carbonProperties + .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO); + assertTrue(valueAfterValidation.equals("0.1")); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala index 37b722f..1958d61 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import java.net.{InetAddress, InterfaceAddress, NetworkInterface} import scala.collection.JavaConverters._ +import scala.util.control.Breaks._ import org.apache.spark.SparkContext import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -33,6 +34,26 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil object DistributionUtil { @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + /* + * minimum required registered resource for starting block distribution + */ + lazy val minRegisteredResourceRatio: Double = { + val value: String = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, + CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT) + java.lang.Double.parseDouble(value) + } + + /* + * node registration wait time + */ + lazy val dynamicAllocationSchTimeOut: Integer = { + val value: String = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, + CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT) + // milli second + java.lang.Integer.parseInt(value) * 1000 + } /* * This method will return the list of executers in the cluster. @@ -202,18 +223,25 @@ object DistributionUtil { var nodes = DistributionUtil.getNodeList(sparkContext) // calculate the number of times loop has to run to check for starting // the requested number of executors - val threadSleepTime = CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME - val loopCounter = calculateCounterBasedOnExecutorStartupTime(threadSleepTime) - var maxTimes = loopCounter - while (nodes.length < requiredExecutors && maxTimes > 0) { - Thread.sleep(threadSleepTime) - nodes = DistributionUtil.getNodeList(sparkContext) - maxTimes = maxTimes - 1 + val threadSleepTime = + CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME + val maxRetryCount = calculateMaxRetry + var maxTimes = maxRetryCount + breakable { + while (nodes.length < requiredExecutors && maxTimes > 0) { + Thread.sleep(threadSleepTime); + nodes = DistributionUtil.getNodeList(sparkContext) + maxTimes = maxTimes - 1; + val resourceRatio = (nodes.length.toDouble / requiredExecutors) + if (resourceRatio.compareTo(minRegisteredResourceRatio) >= 0) { + break + } + } } val timDiff = System.currentTimeMillis() - startTime LOGGER.info(s"Total Time taken to ensure the required executors : $timDiff") LOGGER.info(s"Time elapsed to allocate the required executors: " + - s"${(loopCounter - maxTimes) * threadSleepTime}") + s"${(maxRetryCount - maxTimes) * threadSleepTime}") nodes.distinct.toSeq } @@ -245,21 +273,18 @@ object DistributionUtil { /** * This method will calculate how many times a loop will run with an interval of given sleep * time to wait for requested executors to come up - * - * @param threadSleepTime - * @return + * + * @return The max retry count */ - private def calculateCounterBasedOnExecutorStartupTime(threadSleepTime: Int): Int = { - var executorStartUpTimeOut = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT, - CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT).toInt - // convert seconds into milliseconds for loop counter calculation - executorStartUpTimeOut = executorStartUpTimeOut * 1000 - // make executor start up time exactly divisible by thread sleep time - val remainder = executorStartUpTimeOut % threadSleepTime + def calculateMaxRetry(): Int = { + val remainder = dynamicAllocationSchTimeOut % CarbonCommonConstants + .CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME + val retryCount: Int = dynamicAllocationSchTimeOut / CarbonCommonConstants + .CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME if (remainder > 0) { - executorStartUpTimeOut = executorStartUpTimeOut + threadSleepTime - remainder + retryCount + 1 + } else { + retryCount } - executorStartUpTimeOut / threadSleepTime } }