[CARBONDATA-1624]Set the default value of 'carbon.number.of.cores.while.loading' as per the spark conf 'spark.executor.cores'
1.Use 'spark.executor.cores' as the default value for 'carbon.number.of.cores.while.loading' 2.Use 'CarbonProperties.getNumberOfCores()' to get 'carbon.number.of.cores.while.loading' uniformly This closes #1455 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e9d6898 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e9d6898 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e9d6898 Branch: refs/heads/pre-aggregate Commit: 9e9d68988e29a9c3a2520189d822835562f4a34d Parents: f3b507c Author: Zhang Zhichao <441586...@qq.com> Authored: Tue Oct 31 10:55:38 2017 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Thu Nov 9 22:44:38 2017 +0800 ---------------------------------------------------------------------- .../dictionary/AbstractDictionaryCache.java | 8 +------- .../generator/TableDictionaryGenerator.java | 10 +--------- .../reader/CarbonDeleteFilesDataReader.java | 8 +------- .../carbondata/core/util/CarbonProperties.java | 6 ++++-- .../testsuite/datamap/DataMapWriterSuite.scala | 7 ++++--- .../command/management/LoadTableCommand.scala | 20 ++++++++++++++++++++ .../sort/sortdata/SortParameters.java | 20 ++------------------ .../store/CarbonFactDataHandlerColumnar.java | 11 +---------- 8 files changed, 34 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java index 4046364..e145cb8 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java @@ -72,13 +72,7 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId * max number of threads for a job */ private void initThreadPoolSize() { - try { - thread_pool_size = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - } catch (NumberFormatException e) { - thread_pool_size = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } + thread_pool_size = CarbonProperties.getInstance().getNumberOfCores(); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java index f08ba1f..905d2fa 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.devapi.BiDictionary; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.devapi.DictionaryGenerator; @@ -72,14 +71,7 @@ public class TableDictionaryGenerator } @Override public void writeDictionaryData() { - int numOfCores = 1; - try { - numOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - } catch (NumberFormatException e) { - numOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } + int numOfCores = CarbonProperties.getInstance().getNumberOfCores(); long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(numOfCores); for (final DictionaryGenerator generator : columnMap.values()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java index 6739b41..cc6e53f 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java @@ -65,13 +65,7 @@ public class CarbonDeleteFilesDataReader { * max number of threads for a job */ private void initThreadPoolSize() { - try { - thread_pool_size = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - } catch (NumberFormatException e) { - thread_pool_size = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } + thread_pool_size = CarbonProperties.getInstance().getNumberOfCores(); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index cdd6183..678a6f7 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -669,9 +669,11 @@ public final class CarbonProperties { int numberOfCores; try { numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); + .getProperty(CarbonCommonConstants.NUM_CORES_LOADING)); } catch (NumberFormatException exc) { + LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING + + " is wrong. Falling back to the default value " + + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); } return numberOfCores; http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index ff900ce..888c97d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -18,13 +18,11 @@ package org.apache.carbondata.spark.testsuite.datamap import java.util - import scala.collection.JavaConverters._ - import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll - +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage @@ -117,6 +115,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty("carbon.blockletgroup.size.in.mb", "1") + CarbonProperties.getInstance() + .addProperty("carbon.number.of.cores.while.loading", + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL) val df = buildTestData(300000) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala index 630ee27..bda6829 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala @@ -84,6 +84,26 @@ case class LoadTableCommand( val carbonProperty: CarbonProperties = CarbonProperties.getInstance() carbonProperty.addProperty("zookeeper.enable.lock", "false") + + // get the value of 'spark.executor.cores' from spark conf, default value is 1 + val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1") + // get the value of 'carbon.number.of.cores.while.loading' from carbon properties, + // default value is the value of 'spark.executor.cores' + val numCoresLoading = + try { + CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, sparkExecutorCores) + } catch { + case exc: NumberFormatException => + LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING + + " is wrong. Falling back to the default value " + + sparkExecutorCores) + sparkExecutorCores + } + + // update the property with new value + carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading) + val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) val tableProperties = relation.tableMeta.carbonTable.getTableInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index 39e1049..4da4c84 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -418,15 +418,7 @@ public class SortParameters implements Serializable { parameters.setTempFileLocation(sortTempDirs); LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ",")); - int numberOfCores; - try { - numberOfCores = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - numberOfCores = numberOfCores / 2; - } catch (NumberFormatException exc) { - numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } + int numberOfCores = carbonProperties.getNumberOfCores() / 2; parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1); parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties @@ -539,15 +531,7 @@ public class SortParameters implements Serializable { parameters.setTempFileLocation(sortTempDirs); LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ",")); - int numberOfCores; - try { - numberOfCores = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - numberOfCores = numberOfCores / 2; - } catch (NumberFormatException exc) { - numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } + int numberOfCores = carbonProperties.getNumberOfCores() / 2; parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1); parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 7882cd4..504e7ec 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -223,16 +223,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); } } else { - try { - numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - } catch (NumberFormatException exc) { - LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING - + "is wrong.Falling back to the default value " - + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } + numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); } if (sortScope != null && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {