Repository: carbondata
Updated Branches:
  refs/heads/master 9b88a0652 -> 685087ed4


[CARBONDATA-2309][DataLoad] Add strategy to generate bigger carbondata files in 
case of small amout of data

This closes #2314


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/685087ed
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/685087ed
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/685087ed

Branch: refs/heads/master
Commit: 685087ed4de7ecc181de6ee43c9e5865eb26b650
Parents: 9b88a06
Author: ndwangsen <luffy.w...@huawei.com>
Authored: Fri May 25 21:45:58 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Wed Jun 13 01:56:03 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  12 ++
 .../constants/CarbonLoadOptionConstants.java    |  10 ++
 .../carbondata/core/util/CarbonProperties.java  |  12 ++
 docs/useful-tips-on-carbondata.md               |   3 +-
 .../dataload/TestTableLoadMinSize.scala         | 149 +++++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   9 +-
 .../management/CarbonLoadDataCommand.scala      |   4 +-
 .../loading/model/CarbonLoadModel.java          |  15 ++
 .../loading/model/CarbonLoadModelBuilder.java   |   1 +
 .../processing/loading/model/LoadOption.java    |   2 +
 .../processing/util/CarbonLoaderUtil.java       | 105 +++++++++++--
 .../processing/util/CarbonLoaderUtilTest.java   |  51 ++++++-
 .../scala/org/apache/spark/rpc/Master.scala     |   3 +-
 14 files changed, 355 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 08aa704..c7281dd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1761,6 +1761,18 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_LUCENE_INDEX_STOP_WORDS_DEFAULT = "false";
 
+  /**
+   * The node loads the smallest amount of data
+   */
+  @CarbonProperty
+  public static final String CARBON_LOAD_MIN_SIZE_INMB = "load_min_size_inmb";
+  public static final String CARBON_LOAD_MIN_NODE_SIZE_INMB_DEFAULT = "256";
+
+  /**
+   *  the node minimum load data default value
+   */
+  public static final int CARBON_LOAD_MIN_SIZE_DEFAULT = 256;
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a2213d5..6b8281c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -151,4 +151,14 @@ public final class CarbonLoadOptionConstants {
   public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE
       = "carbon.load.sortMemory.spill.percentage";
   public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT 
= "0";
+
+  /**
+   *  if loading data is too small, the original loading method will produce 
many small files.
+   *  enable set the node load minimum amount of data,avoid producing many 
small files.
+   *  This option is especially useful when you encounter a lot of small 
amounts of data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE
+      = "carbon.load.min.size.enabled";
+  public static final String ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT = 
"false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 4ee5199..6eb7de6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1310,6 +1310,18 @@ public final class CarbonProperties {
         
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT);
     return skewedEnabled.equalsIgnoreCase("true");
   }
+
+  /**
+   * whether optimization for the node loads the minimum amount of data is 
enabled
+   * @return true, if enabled; false for not enabled.
+   */
+  public boolean isLoadMinSizeOptimizationEnabled() {
+    String loadMinSize = getProperty(
+            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE,
+            
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT);
+    return loadMinSize.equalsIgnoreCase("true");
+  }
+
   /**
    * returns true if carbon property
    * @param key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md 
b/docs/useful-tips-on-carbondata.md
index ff339d0..732d38f 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -170,5 +170,6 @@
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data 
loading | Whether to use multiple YARN local directories during table data 
loading for disk load balance | After enabling 'carbon.use.local.dir', if this 
is set to true, CarbonData will use all YARN local directories during data load 
for disk load balance, that will improve the data load performance. Please 
enable this property when you encounter disk hotspot problem during data 
loading. |
   | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data 
loading | Specify the name of compressor to compress the intermediate sort 
temporary files during sort procedure in data loading. | The optional values 
are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that 
Carbondata will not compress the sort temp files. This parameter will be useful 
if you encounter disk bottleneck. |
   | carbon.load.skewedDataOptimization.enabled | 
spark/carbonlib/carbon.properties | Data loading | Whether to enable size based 
block allocation strategy for data loading. | When loading, carbondata will use 
file size based block allocation strategy for task distribution. It will make 
sure that all the executors process the same size of data -- It's useful if the 
size of your input data files varies widely, say 1MB~1GB. |
-
+  | carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data 
loading | Whether to enable node minumun input data size allocation strategy 
for data loading.| When loading, carbondata will use node minumun input data 
size allocation strategy for task distribution. It will make sure the node load 
the minimum amount of data -- It's useful if the size of your input data files 
very small, say 1MB~256MB,Avoid generating a large number of small files. |
+  
   Note: If your CarbonData instance is provided only for query, you may 
specify the property 'spark.speculation=true' which is in conf directory of 
spark.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
new file mode 100644
index 0000000..ebb4e32
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLoadMinSize.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.test.util.QueryTest
+
+/**
+  * Test Class for load_min_size
+  *
+  */
+
+class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
+  val testData1 = s"$resourcesPath/source.csv"
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS table_loadminsize1")
+    sql("DROP TABLE IF EXISTS table_loadminsize2")
+    sql("DROP TABLE IF EXISTS table_loadminsize3")
+  }
+
+  test("Value test: set table load min size in not int value") {
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS table_loadminsize1
+        (ID Int, date Timestamp, country String,
+        name String, phonetype String, serialname String, salary Int)
+        STORED BY 'org.apache.carbondata.format'
+        TBLPROPERTIES('table_blocksize'='128 MB')
+      """)
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, 
"true")
+
+    sql(s"""
+           LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize1 
OPTIONS('load_min_size_inmb'='256 MB')
+           """)
+
+    checkAnswer(
+      sql("""
+           SELECT country, count(salary) AS amount
+           FROM table_loadminsize1
+           WHERE country IN ('china','france')
+           GROUP BY country
+          """),
+      Seq(Row("china", 96), Row("france", 1))
+    )
+  }
+
+  test("Function test:: set table load min size in int value") {
+
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS table_loadminsize2
+        (ID Int, date Timestamp, country String,
+        name String, phonetype String, serialname String, salary Int)
+        STORED BY 'org.apache.carbondata.format'
+        TBLPROPERTIES('table_blocksize'='128 MB')
+      """)
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, 
"true")
+
+    sql(s"""
+           LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize2 
OPTIONS('load_min_size_inmb'='256')
+           """)
+
+    checkAnswer(
+      sql("""
+           SELECT country, count(salary) AS amount
+           FROM table_loadminsize2
+           WHERE country IN ('china','france')
+           GROUP BY country
+          """),
+      Seq(Row("china", 96), Row("france", 1))
+    )
+
+  }
+
+  test("Function test:: not set table load min size property") {
+
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS table_loadminsize3
+        (ID Int, date Timestamp, country String,
+        name String, phonetype String, serialname String, salary Int)
+        STORED BY 'org.apache.carbondata.format'
+        TBLPROPERTIES('table_blocksize'='128 MB')
+      """)
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, 
"true")
+
+    sql(s"""
+           LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize3
+           """)
+
+    checkAnswer(
+      sql("""
+           SELECT country, count(salary) AS amount
+           FROM table_loadminsize3
+           WHERE country IN ('china','france')
+           GROUP BY country
+          """),
+      Seq(Row("china", 96), Row("france", 1))
+    )
+
+  }
+
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS table_loadminsize1")
+    sql("DROP TABLE IF EXISTS table_loadminsize2")
+    sql("DROP TABLE IF EXISTS table_loadminsize3")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, 
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9bc5597..1f04fa4 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -885,7 +885,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", 
"BAD_RECORD_PATH",
       "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS",
       "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", 
"SKIP_EMPTY_LINE",
-      "SORT_COLUMN_BOUNDS"
+      "SORT_COLUMN_BOUNDS","LOAD_MIN_SIZE_INMB"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index bdbaef5..21a8641 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1070,14 +1070,21 @@ object CarbonDataRDDFactory {
       .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
     val skewedDataOptimization = CarbonProperties.getInstance()
       .isLoadSkewedDataOptimizationEnabled()
+    val loadMinSizeOptimization = CarbonProperties.getInstance()
+      .isLoadMinSizeOptimizationEnabled()
+    // get user ddl input the node loads the smallest amount of data
+    val expectedMinSizePerNode = carbonLoadModel.getLoadMinSize()
     val blockAssignStrategy = if (skewedDataOptimization) {
       CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
+    } else if (loadMinSizeOptimization) {
+      CarbonLoaderUtil.BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST
     } else {
       CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST
     }
     LOGGER.info(s"Allocating block to nodes using strategy: 
$blockAssignStrategy")
+
     val nodeBlockMapping = 
CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1,
-      activeNodes.toList.asJava, blockAssignStrategy).asScala.toSeq
+      activeNodes.toList.asJava, blockAssignStrategy, 
expectedMinSizePerNode).asScala.toSeq
     val timeElapsed: Long = System.currentTimeMillis - startTime
     LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
     LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index ba062c0..4703b23 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -24,7 +24,6 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
@@ -45,7 +44,6 @@ import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
-
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
@@ -76,7 +74,7 @@ import 
org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServicePr
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.load.{CsvRDDHelper, 
DataLoadProcessorStepOnSpark}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, 
GlobalDictionaryUtil, SparkDataTypeConverterImpl}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
GlobalDictionaryUtil, SparkDataTypeConverterImpl}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index f82de83..f267fa7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -210,6 +210,11 @@ public class CarbonLoadModel implements Serializable {
    * Flder path to where data should be written for this load.
    */
   private String dataWritePath;
+  
+  /**
+   * sort columns bounds
+   */
+  private String loadMinSize;
 
   private List<String> mergedSegmentIds;
 
@@ -388,6 +393,14 @@ public class CarbonLoadModel implements Serializable {
     this.sortColumnsBoundsStr = sortColumnsBoundsStr;
   }
 
+  public String getLoadMinSize() {
+    return loadMinSize;
+  }
+
+  public void setLoadMinSize(String loadMinSize) {
+    this.loadMinSize = loadMinSize;
+  }
+
   /**
    * Get copy with taskNo.
    * Broadcast value is shared in process, so we need to copy it to make sure 
the value in each
@@ -439,6 +452,7 @@ public class CarbonLoadModel implements Serializable {
     copy.badRecordsLocation = badRecordsLocation;
     copy.isLoadWithoutConverterStep = isLoadWithoutConverterStep;
     copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
+    copy.loadMinSize = loadMinSize;
     return copy;
   }
 
@@ -492,6 +506,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.badRecordsLocation = badRecordsLocation;
     copyObj.isAggLoadRequest = isAggLoadRequest;
     copyObj.sortColumnsBoundsStr = sortColumnsBoundsStr;
+    copyObj.loadMinSize = loadMinSize;
     return copyObj;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 9a9d09e..4ad1984 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -270,6 +270,7 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
     carbonLoadModel.readAndSetLoadMetadataDetails();
     
carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds"));
+    
carbonLoadModel.setLoadMinSize(optionsFinal.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB));
   }
 
   private int validateMaxColumns(String[] csvHeaders, String maxColumns)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 4ff1cce..1a65937 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -196,6 +196,8 @@ public class LoadOption {
     optionsFinal.put("single_pass", String.valueOf(singlePass));
     optionsFinal.put("sort_scope", "local_sort");
     optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, 
"sort_column_bounds", ""));
+    optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, 
Maps.getOrDefault(options,CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, 
CarbonCommonConstants
+            .CARBON_LOAD_MIN_NODE_SIZE_INMB_DEFAULT));
     return optionsFinal;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 6d938e1..d5a0b78 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -53,6 +53,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
@@ -77,7 +78,8 @@ public final class CarbonLoaderUtil {
    */
   public enum BlockAssignmentStrategy {
     BLOCK_NUM_FIRST("Assign blocks to node base on number of blocks"),
-    BLOCK_SIZE_FIRST("Assign blocks to node base on data size of blocks");
+    BLOCK_SIZE_FIRST("Assign blocks to node base on data size of blocks"),
+    NODE_MIN_SIZE_FIRST("Assign blocks to node base on minumun size of 
inputs");
     private String name;
     BlockAssignmentStrategy(String name) {
       this.name = name;
@@ -537,7 +539,7 @@ public final class CarbonLoaderUtil {
       List<String> activeNode) {
     Map<String, List<Distributable>> mapOfNodes =
         CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, 
activeNode,
-            BlockAssignmentStrategy.BLOCK_NUM_FIRST);
+            BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
     int taskPerNode = parallelism / mapOfNodes.size();
     //assigning non zero value to noOfTasksPerNode
     int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
@@ -554,7 +556,7 @@ public final class CarbonLoaderUtil {
   public static Map<String, List<Distributable>> 
nodeBlockMapping(List<Distributable> blockInfos,
       int noOfNodesInput) {
     return nodeBlockMapping(blockInfos, noOfNodesInput, null,
-        BlockAssignmentStrategy.BLOCK_NUM_FIRST);
+        BlockAssignmentStrategy.BLOCK_NUM_FIRST,null);
   }
 
   /**
@@ -575,11 +577,12 @@ public final class CarbonLoaderUtil {
    * @param noOfNodesInput -1 if number of nodes has to be decided
    *                       based on block location information
    * @param blockAssignmentStrategy strategy used to assign blocks
+   * @param loadMinSize the property load_min_size_inmb specified by the user
    * @return a map that maps node to blocks
    */
   public static Map<String, List<Distributable>> nodeBlockMapping(
       List<Distributable> blockInfos, int noOfNodesInput, List<String> 
activeNodes,
-      BlockAssignmentStrategy blockAssignmentStrategy) {
+      BlockAssignmentStrategy blockAssignmentStrategy, String 
expectedMinSizePerNode ) {
     ArrayList<NodeMultiBlockRelation> rtnNode2Blocks = new ArrayList<>();
 
     Set<Distributable> uniqueBlocks = new HashSet<>(blockInfos);
@@ -596,20 +599,52 @@ public final class CarbonLoaderUtil {
 
     // calculate the average expected size for each node
     long sizePerNode = 0;
+    long totalFileSize = 0;
     if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
       sizePerNode = blockInfos.size() / noofNodes;
       sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
-    } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == 
blockAssignmentStrategy) {
-      long totalFileSize = 0;
+    } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == 
blockAssignmentStrategy
+        || BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == 
blockAssignmentStrategy) {
       for (Distributable blockInfo : uniqueBlocks) {
         totalFileSize += ((TableBlockInfo) blockInfo).getBlockLength();
       }
       sizePerNode = totalFileSize / noofNodes;
     }
 
-    // assign blocks to each node
-    assignBlocksByDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks, 
originNode2Blocks,
-        activeNodes, blockAssignmentStrategy);
+    // if enable to control the minimum amount of input data for each node
+    if (BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == 
blockAssignmentStrategy) {
+      long iexpectedMinSizePerNode = 0;
+      // validate the property load_min_size_inmb specified by the user
+      if (CarbonUtil.validateValidIntType(expectedMinSizePerNode)) {
+        iexpectedMinSizePerNode = Integer.parseInt(expectedMinSizePerNode);
+      } else {
+        LOGGER.warn("Invalid load_min_size_inmb value found: " + 
expectedMinSizePerNode
+            + ", only int value greater than 0 is supported.");
+        iexpectedMinSizePerNode = 
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_DEFAULT;
+      }
+      // If the average expected size for each node greater than load min size,
+      // then fall back to default strategy
+      if (iexpectedMinSizePerNode * 1024 * 1024 < sizePerNode) {
+        if 
(CarbonProperties.getInstance().isLoadSkewedDataOptimizationEnabled()) {
+          blockAssignmentStrategy = BlockAssignmentStrategy.BLOCK_SIZE_FIRST;
+        } else {
+          blockAssignmentStrategy = BlockAssignmentStrategy.BLOCK_NUM_FIRST;
+        }
+        LOGGER.info("Specified minimum data size to load is less than the 
average size for each node, "
+            + "fallback to default strategy" + blockAssignmentStrategy);
+      } else {
+        sizePerNode = iexpectedMinSizePerNode;
+      }
+    }
+     
+    if (BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == 
blockAssignmentStrategy) {
+      // assign blocks to each node ignore data locality
+      assignBlocksIgnoreDataLocality(rtnNode2Blocks, sizePerNode, 
uniqueBlocks, activeNodes);
+    } else {
+      // assign blocks to each node
+      assignBlocksByDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks, 
originNode2Blocks,
+          activeNodes, blockAssignmentStrategy);
+    }
 
     // if any blocks remain then assign them to nodes in round robin.
     assignLeftOverBlocks(rtnNode2Blocks, uniqueBlocks, sizePerNode, 
activeNodes,
@@ -623,7 +658,7 @@ public final class CarbonLoaderUtil {
     }
     return rtnNodeBlocksMap;
   }
-
+  
   /**
    * Assigning the blocks of a node to tasks.
    *
@@ -757,6 +792,7 @@ public final class CarbonLoaderUtil {
         populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
         break;
       case BLOCK_SIZE_FIRST:
+      case NODE_MIN_SIZE_FIRST:
         populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
         break;
       default:
@@ -836,6 +872,7 @@ public final class CarbonLoaderUtil {
         roundRobinAssignBlocksByNum(node2Blocks, remainingBlocks);
         break;
       case BLOCK_SIZE_FIRST:
+      case NODE_MIN_SIZE_FIRST:
         roundRobinAssignBlocksBySize(node2Blocks, remainingBlocks);
         break;
       default:
@@ -983,6 +1020,54 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * allocate distributable blocks to nodes based on ignore data locality
+   */
+  private static void assignBlocksIgnoreDataLocality(
+          ArrayList<NodeMultiBlockRelation> outputNode2Blocks,
+          long expectedSizePerNode, Set<Distributable> remainingBlocks,
+          List<String> activeNodes) {
+    // get all blocks
+    Set<Distributable> uniqueBlocks = new HashSet<>(remainingBlocks);
+    // shuffle activeNodes ignore data locality
+    List<String> shuffleNodes  = new ArrayList<>(activeNodes);
+    Collections.shuffle(shuffleNodes);
+
+    for (String activeNode : shuffleNodes) {
+      long nodeCapacity = 0;
+      NodeMultiBlockRelation nodeBlock = new NodeMultiBlockRelation(activeNode,
+          new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
+      // loop thru blocks of each Node
+      for (Distributable block : uniqueBlocks) {
+        if (!remainingBlocks.contains(block)) {
+          // this block has been added before
+          continue;
+        }
+
+        long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
+        if (nodeCapacity == 0
+            || nodeCapacity + thisBlockSize <= expectedSizePerNode * 1024 * 
1024) {
+          nodeBlock.getBlocks().add(block);
+          nodeCapacity += thisBlockSize;
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(
+                "First Assignment iteration: " + ((TableBlockInfo) 
block).getFilePath() + '-'
+                    + ((TableBlockInfo) block).getBlockLength() + "-->" + 
activeNode);
+          }
+          remainingBlocks.remove(block);
+          // this block is too big for current node and there are still 
capacity left
+          // for small files, so continue to allocate block on this node in 
next iteration.
+        } else {
+          // No need to continue loop as node is full
+          break;
+        }
+      }
+      if (nodeBlock.getBlocks().size() != 0) {
+        outputNode2Blocks.add(nodeBlock);
+      }
+    }
+  }
+
+  /**
    * method validates whether the node is active or not.
    *
    * @param activeNode

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
 
b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
index 9c66ada..94f8b84 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
@@ -72,6 +72,29 @@ public class CarbonLoaderUtilTest {
     return blockInfos;
   }
 
+  private List<Distributable> generateBlocks2() {
+    List<Distributable> blockInfos = new ArrayList<>();
+    String filePath = "/fakepath";
+    String blockId = "1";
+
+    String[] locations = new String[] { "host2", "host3" };
+    ColumnarFormatVersion version = ColumnarFormatVersion.V1;
+
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo(filePath + "_a", 0,
+            blockId, locations, 30 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo1);
+
+    TableBlockInfo tableBlockInfo2 = new TableBlockInfo(filePath + "_b", 0,
+            blockId, locations, 30 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo2);
+
+    TableBlockInfo tableBlockInfo3 = new TableBlockInfo(filePath + "_c", 0,
+            blockId, locations, 30 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo3);
+
+    return blockInfos;
+  }
+
   private List<String> generateExecutors() {
     List<String> activeNodes = new ArrayList<>();
     activeNodes.add("host1");
@@ -86,9 +109,9 @@ public class CarbonLoaderUtilTest {
     List<String> activeNodes = generateExecutors();
 
     // the blocks are assigned by size, so the number of block for each node 
are different
-    Map<String, List<Distributable>> nodeMappingBySize =
-        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
-            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST);
+    Map<String, List<Distributable>> nodeMappingBySize = CarbonLoaderUtil
+        .nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST, null);
     LOGGER.info(convertMapListAsString(nodeMappingBySize));
     Assert.assertEquals(3, nodeMappingBySize.size());
     for (Map.Entry<String, List<Distributable>> entry : 
nodeMappingBySize.entrySet()) {
@@ -102,9 +125,9 @@ public class CarbonLoaderUtilTest {
     }
 
     // the blocks are assigned by number, so the number of blocks for each 
node are nearly the same
-    Map<String, List<Distributable>> nodeMappingByNum =
-        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
-            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
+    Map<String, List<Distributable>> nodeMappingByNum = CarbonLoaderUtil
+        .nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
     LOGGER.info(convertMapListAsString(nodeMappingByNum));
     Assert.assertEquals(3, nodeMappingBySize.size());
     for (Map.Entry<String, List<Distributable>> entry : 
nodeMappingByNum.entrySet()) {
@@ -113,6 +136,22 @@ public class CarbonLoaderUtilTest {
     }
   }
 
+  @Test
+  public void testNodeBlockMappingByNodeRandom() throws Exception {
+    List<Distributable> blockInfos = generateBlocks2();
+    List<String> activeNodes = generateExecutors();
+
+    // the blocks are assigned by node as random, The node loads the smallest 
amount of data by user specified
+    Map<String, List<Distributable>> nodeMappingByRandom = CarbonLoaderUtil
+        .nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST, 
"90");
+    LOGGER.info(convertMapListAsString(nodeMappingByRandom));
+    Assert.assertEquals(1, nodeMappingByRandom.size());
+    for (Map.Entry<String, List<Distributable>> entry : 
nodeMappingByRandom.entrySet()) {
+      Assert.assertTrue(entry.getValue().size() == blockInfos.size());
+    }
+  }
+
   private <K, T> String convertMapListAsString(Map<K, List<T>> mapList) {
     StringBuffer sb = new StringBuffer();
     for (Map.Entry<K, List<T>> entry : mapList.entrySet()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/685087ed/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala 
b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index f48f5e4..b7630fb 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -279,7 +279,8 @@ class Master(sparkConf: SparkConf) {
       distributables.asJava,
       -1,
       getWorkers.asJava,
-      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST)
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
+      null)
   }
 
   /** return hostname of all workers */

Reply via email to