[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data 
loading

Enhance data loading performance by specifying sort column bounds
1. Add row range number during convert-process-step
2. Dispatch rows to each sorter by range number
3. Sort/Write process step can be done concurrently in each range
4. Since all sorttemp files will be written in one folders, we add range
number to the file name to distingush them

Tests added and docs updated

This closes #1953


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ab9b4cf8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ab9b4cf8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ab9b4cf8

Branch: refs/heads/datamap
Commit: ab9b4cf89c8d277887f1f838be65724612f5874b
Parents: 88c0527
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Tue Feb 13 10:58:06 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Sun Feb 25 20:42:39 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  10 +
 .../core/datastore/row/CarbonRow.java           |  10 +-
 .../ThriftWrapperSchemaConverterImpl.java       |   2 +-
 .../core/metadata/schema/BucketingInfo.java     |  24 +-
 .../core/metadata/schema/ColumnRangeInfo.java   |  29 ++
 .../metadata/schema/SortColumnRangeInfo.java    |  83 +++++
 docs/data-management-on-carbondata.md           |  11 +
 .../TestLoadDataWithSortColumnBounds.scala      | 348 +++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   2 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../spark/rdd/PartitionSplitter.scala           |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 .../loading/CarbonDataLoadConfiguration.java    |  11 +
 .../loading/DataLoadProcessBuilder.java         |  78 ++++-
 .../loading/converter/RowConverter.java         |   2 +-
 .../converter/impl/RowConverterImpl.java        |   5 +
 .../loading/model/CarbonLoadModel.java          |  14 +
 .../loading/model/CarbonLoadModelBuilder.java   |   1 +
 .../processing/loading/model/LoadOption.java    |   1 +
 .../partition/impl/HashPartitionerImpl.java     |  10 +-
 .../partition/impl/RangePartitionerImpl.java    |  71 ++++
 .../partition/impl/RawRowComparator.java        |  63 ++++
 .../processing/loading/sort/SorterFactory.java  |  16 +-
 ...arallelReadMergeSorterWithBucketingImpl.java | 272 ---------------
 ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++++++++++++++
 ...arallelReadMergeSorterWithBucketingImpl.java | 263 --------------
 ...allelReadMergeSorterWithColumnRangeImpl.java | 293 ++++++++++++++++
 .../loading/sort/unsafe/UnsafeSortDataRows.java |   6 +-
 .../unsafe/merger/UnsafeIntermediateMerger.java |   6 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  11 +-
 .../steps/DataConverterProcessorStepImpl.java   | 102 +++++-
 ...ConverterProcessorWithBucketingStepImpl.java | 161 ---------
 .../steps/DataWriterProcessorStepImpl.java      |  70 +++-
 .../SingleThreadFinalSortFilesMerger.java       |   3 +-
 .../processing/sort/sortdata/SortDataRows.java  |  11 +-
 .../sortdata/SortIntermediateFileMerger.java    |   6 +-
 .../sort/sortdata/SortParameters.java           |  10 +
 .../store/CarbonFactDataHandlerColumnar.java    |   6 +-
 39 files changed, 1559 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a6bf60f..8ff8dc4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants {
   public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
       = "carbon.load.skewedDataOptimization.enabled";
   public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
+
+  /**
+   * field delimiter for each field in one bound
+   */
+  public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ",";
+
+  /**
+   * row delimiter for each sort column bounds
+   */
+  public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index 8702421..bb624af 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -29,7 +29,7 @@ public class CarbonRow implements Serializable {
 
   private Object[] rawData;
 
-  public short bucketNumber;
+  private short rangeId;
 
   public CarbonRow(Object[] data) {
     this.data = data;
@@ -83,4 +83,12 @@ public class CarbonRow implements Serializable {
   public void setRawData(Object[] rawData) {
     this.rawData = rawData;
   }
+
+  public short getRangeId() {
+    return rangeId;
+  }
+
+  public void setRangeId(short rangeId) {
+    this.rangeId = rangeId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index e9c5505..897b42d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -293,7 +293,7 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
       
thriftColumnSchema.add(fromWrapperToExternalColumnSchema(wrapperColumnSchema));
     }
     return new org.apache.carbondata.format.BucketingInfo(thriftColumnSchema,
-        bucketingInfo.getNumberOfBuckets());
+        bucketingInfo.getNumOfRanges());
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
index 569241d..e24f0f8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/BucketingInfo.java
@@ -24,40 +24,41 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
  * Bucketing information
  */
-public class BucketingInfo implements Serializable, Writable {
-
+@InterfaceAudience.Internal
+public class BucketingInfo implements ColumnRangeInfo, Serializable, Writable {
   private static final long serialVersionUID = -0L;
-
   private List<ColumnSchema> listOfColumns;
-
-  private int numberOfBuckets;
+  // number of value ranges
+  private int numOfRanges;
 
   public BucketingInfo() {
 
   }
 
-  public BucketingInfo(List<ColumnSchema> listOfColumns, int numberOfBuckets) {
+  public BucketingInfo(List<ColumnSchema> listOfColumns, int numberOfRanges) {
     this.listOfColumns = listOfColumns;
-    this.numberOfBuckets = numberOfBuckets;
+    this.numOfRanges = numberOfRanges;
   }
 
   public List<ColumnSchema> getListOfColumns() {
     return listOfColumns;
   }
 
-  public int getNumberOfBuckets() {
-    return numberOfBuckets;
+  @Override
+  public int getNumOfRanges() {
+    return numOfRanges;
   }
 
   @Override
   public void write(DataOutput output) throws IOException {
-    output.writeInt(numberOfBuckets);
+    output.writeInt(numOfRanges);
     output.writeInt(listOfColumns.size());
     for (ColumnSchema aColSchema : listOfColumns) {
       aColSchema.write(output);
@@ -66,7 +67,7 @@ public class BucketingInfo implements Serializable, Writable {
 
   @Override
   public void readFields(DataInput input) throws IOException {
-    this.numberOfBuckets = input.readInt();
+    this.numOfRanges = input.readInt();
     int colSchemaSize = input.readInt();
     this.listOfColumns = new ArrayList<>(colSchemaSize);
     for (int i = 0; i < colSchemaSize; i++) {
@@ -75,5 +76,4 @@ public class BucketingInfo implements Serializable, Writable {
       this.listOfColumns.add(aSchema);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java
new file mode 100644
index 0000000..c5454b2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/ColumnRangeInfo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * interface for column range information. Currently we treat bucket and 
sort_column_range as
+ * value ranges for a column.
+ */
+@InterfaceAudience.Internal
+public interface ColumnRangeInfo {
+  int getNumOfRanges();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java
new file mode 100644
index 0000000..9d2460a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SortColumnRangeInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * column ranges specified by sort column bounds
+ */
+@InterfaceAudience.Internal
+public class SortColumnRangeInfo implements ColumnRangeInfo, Serializable {
+  private static final long serialVersionUID = 1L;
+  // indices for the sort columns in the raw row
+  private int[] sortColumnIndex;
+  // is the sort column no dictionary encoded
+  private boolean[] isSortColumnNoDict;
+  // each literal sort column bounds specified by user
+  private String[] userSpecifiedRanges;
+  // separator for the field values in each bound
+  private String separator;
+  // number of value ranges for the columns
+  private int numOfRanges;
+
+  public SortColumnRangeInfo(int[] sortColumnIndex, boolean[] 
isSortColumnNoDict,
+      String[] userSpecifiedRanges, String separator) {
+    this.sortColumnIndex = sortColumnIndex;
+    this.isSortColumnNoDict = isSortColumnNoDict;
+    this.userSpecifiedRanges = userSpecifiedRanges;
+    this.separator = separator;
+    this.numOfRanges = userSpecifiedRanges.length + 1;
+  }
+
+  public int[] getSortColumnIndex() {
+    return sortColumnIndex;
+  }
+
+  public boolean[] getIsSortColumnNoDict() {
+    return isSortColumnNoDict;
+  }
+
+  public String[] getUserSpecifiedRanges() {
+    return userSpecifiedRanges;
+  }
+
+  public String getSeparator() {
+    return separator;
+  }
+
+  @Override
+  public int getNumOfRanges() {
+    return numOfRanges;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("SortColumnRangeInfo{");
+    sb.append("sortColumnIndex=").append(Arrays.toString(sortColumnIndex));
+    sb.append(", 
isSortColumnNoDict=").append(Arrays.toString(isSortColumnNoDict));
+    sb.append(", 
userSpecifiedRanges=").append(Arrays.toString(userSpecifiedRanges));
+    sb.append(", separator='").append(separator).append('\'');
+    sb.append(", numOfRanges=").append(numOfRanges);
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md 
b/docs/data-management-on-carbondata.md
index d7954e1..d89f6b0 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -370,6 +370,17 @@ This tutorial is going to introduce all commands and data 
operations on CarbonDa
     ```
     NOTE: Date formats are specified by date pattern strings. The date pattern 
letters in CarbonData are same as in JAVA. Refer to 
[SimpleDateFormat](http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html).
 
+  - **SORT COLUMN BOUNDS:** Range bounds for sort columns.
+
+    ```
+    OPTIONS('SORT_COLUMN_BOUNDS'='v11,v21,v31;v12,v22,v32;v13,v23,v33')
+    ```
+    NOTE:
+    * SORT_COLUMN_BOUNDS will be used only when the SORT_SCOPE is 'local_sort'.
+    * Each bound is separated by ';' and each field value in bound is 
separated by ','.
+    * Carbondata will use these bounds as ranges to process data concurrently.
+    * Since the actual order and literal order of the dictionary column are 
not necessarily the same, we do not recommend you to use this feature if the 
first sort column is 'dictionary_include'.
+
   - **SINGLE_PASS:** Single Pass Loading enables single job to finish data 
loading with dictionary generation on the fly. It enhances performance in the 
scenarios where the subsequent data loading after initial load involves fewer 
incremental updates on the dictionary.
 
   This option specifies whether to use single pass for loading data or not. By 
default this option is set to FALSE.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala
new file mode 100644
index 0000000..1f171b8
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSortColumnBounds.scala
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.{File, FileOutputStream, OutputStreamWriter, Serializable}
+
+import scala.util.Random
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+case class SortColumnBoundRow (id: Int, date: String, country: String, name: 
String,
+    phoneType: String, serialName: String, salary: Int) extends Serializable
+
+object TestLoadDataWithSortColumnBounds {
+  def generateOneRow(id : Int): SortColumnBoundRow = {
+    SortColumnBoundRow(id,
+      "2015/7/23",
+      s"country$id",
+      s"name$id",
+      s"phone${new Random().nextInt(10000)}",
+      s"ASD${new Random().nextInt(10000)}",
+      10000 + id)
+  }
+}
+
+class TestLoadDataWithSortColumnBounds extends QueryTest with 
BeforeAndAfterAll {
+  private val tableName: String = "test_table_with_sort_column_bounds"
+  private val filePath: String = 
s"$resourcesPath/source_for_sort_column_bounds.csv"
+  private var df: DataFrame = _
+
+  private val dateFormatStr: String = "yyyy/MM/dd"
+  private val totalLineNum = 2000
+
+  private val originDateStatus: String = 
CarbonProperties.getInstance().getProperty(
+    CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, dateFormatStr)
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    prepareDataFile()
+    prepareDataFrame()
+  }
+
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, originDateStatus)
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    new File(filePath).delete()
+    df = null
+  }
+
+  /**
+   * generate loading files based on source.csv but can have more lines
+   */
+  private def prepareDataFile(): Unit = {
+    val file = new File(filePath)
+
+    val sb: StringBuilder = new StringBuilder
+    def generateLine(id : Int): String = {
+      sb.clear()
+      val row = TestLoadDataWithSortColumnBounds.generateOneRow(id)
+      sb.append(row.id).append(',')
+        .append(row.date).append(',')
+        .append(row.country).append(',')
+        .append(row.name).append(',')
+        .append(row.phoneType).append(',')
+        .append(row.serialName).append(',')
+        .append(row.salary)
+        .append(System.lineSeparator())
+        .toString()
+    }
+
+    val outputStream = new FileOutputStream(file)
+    val writer = new OutputStreamWriter(outputStream)
+    for (i <- 1 to totalLineNum) {
+      writer.write(generateLine(i))
+    }
+
+    writer.flush()
+    writer.close()
+    outputStream.flush()
+    outputStream.close()
+  }
+
+  /**
+   * prepare data frame
+   */
+  private def prepareDataFrame(): Unit = {
+    import sqlContext.implicits._
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to totalLineNum)
+      .map(id => {
+        val row = TestLoadDataWithSortColumnBounds.generateOneRow(id)
+        (row.id, row.date, row.country, row.name, row.phoneType, 
row.serialName, row.salary)
+      })
+      .toDF("ID", "date", "country", "name", "phoneType", "serialName", 
"salary")
+  }
+
+  test("load data with sort column bounds: safe mode") {
+    val originStatus = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, 
CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
 "false")
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    // load with 4 bounds
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='400,aab1;800,aab1;1200,aab1;1600,aab1')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, originStatus)
+  }
+
+  test("load data with sort column bounds: unsafe mode") {
+    val originStatus = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, 
CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
 "true")
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    // load with 4 bounds
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='400,aab1;800,aab1;1200,aab1;1600,aab1')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT, originStatus)
+  }
+
+  test("load data with sort column bounds: empty column value in bounds is 
treated as null") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    // bounds have empty value
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='200,aab1;,aab1')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column bounds will be ignored 
if it is empty.") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: number of column value in bounds 
should match that of sort column") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name')")
+    val e = intercept[Exception] {
+      // number of column value does not match that of sort columns
+      sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+          s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+          s" 'sort_column_bounds'='400,aab1;800')")
+    }
+
+    assert(e.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." 
+
+      " Expected 2, actual 1." +
+      " The illegal bound is '800'"))
+
+    val e2 = intercept[Exception] {
+      // number of column value does not match that of sort columns
+      sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+          s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+          s" 'sort_column_bounds'='400,aab1;800,aab1,def')")
+    }
+
+    assert(e2.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." 
+
+      " Expected 2, actual 3." +
+      " The illegal bound is '800,aab1,def'"))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column bounds will be ignored 
if not using local_sort") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata'" +
+        "tblproperties('sort_columns'='ID,name','sort_scope'='global_sort')")
+    // since the sort_scope is 'global_sort', we will ignore the sort column 
bounds,
+    // so the error in sort_column bounds will not be thrown
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 'sort_column_bounds'='400,aab,extra_field')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: no sort columns explicitly 
specified" +
+       " means all dimension columns will be sort columns, so bounds should be 
set correctly") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata'")
+    // the sort_columns will have 5 columns if we don't specify it explicitly
+    val e = intercept[Exception] {
+      sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+          s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+          s" 'sort_column_bounds'='400,aab')")
+    }
+    assert(e.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." 
+
+      " Expected 5, actual 2." +
+      " The illegal bound is '400,aab'"))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column is global dictionary 
encoded") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='ID,name','dictionary_include'='ID')")
+    // ID is sort column and dictionary column. Since the actual order and 
literal order of
+    // this column are not necessarily the same, this will not cause error but 
will cause data skewed.
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 
'sort_column_bounds'='400,name400;800,name800;1200,name1200;1600,name1600')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data with sort column bounds: sort column is global dictionary 
encoded" +
+       " but bounds are not in dictionary") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    sql(s"CREATE TABLE $tableName (ID Int, date Timestamp, country String, 
name String, phonetype String," +
+        "serialname String, salary Int) STORED BY 'carbondata' " +
+        "tblproperties('sort_columns'='name,ID','dictionary_include'='name')")
+    // 'name' is sort column and dictionary column, but value for 'name' in 
bounds does not exists
+    // in dictionary. It will not cause error but will cause data skewed.
+    sql(s"LOAD DATA INPATH '$filePath' INTO TABLE $tableName " +
+        s" 
OPTIONS('fileheader'='ID,date,country,name,phonetype,serialname,salary'," +
+        s" 
'sort_column_bounds'='nmm400,400;nme800,800;nme1200,1200;nme1600,1600')")
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data frame with sort column bounds") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "false")
+      .option("sort_columns", "ID,name")
+      .option("sort_column_bounds", "600,aab1;1200,aab1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    sql(s"select count(*) from $tableName").show()
+    sql(s"select count(*) from $tableName where ID > 1001").show()
+
+    checkAnswer(sql(s"select count(*) from $tableName"), Row(totalLineNum))
+    checkAnswer(sql(s"select count(*) from $tableName where ID > 1001"), 
Row(totalLineNum - 1001))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  test("load data frame with sort column bounds: number of column value in 
bounds should match that of sort column") {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+
+    val e = intercept[Exception] {
+      df.write
+        .format("carbondata")
+        .option("tableName", tableName)
+        .option("tempCSV", "false")
+        .option("sort_columns", "ID,name")
+        .option("sort_column_bounds", "600,aab1;1200,aab1,def")
+        .mode(SaveMode.Overwrite)
+        .save()
+    }
+    assert(e.getMessage.contains(
+      "The number of field in bounds should be equal to that in sort columns." 
+
+      " Expected 2, actual 3." +
+      " The illegal bound is '1200,aab1,def'"))
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 1b68458..d78840b 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -158,7 +158,7 @@ class CarbonScanRDD(
         var i = 0
         val bucketed =
           splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => 
f.getBucketId)
-        (0 until bucketedTable.getNumberOfBuckets).map { bucketId =>
+        (0 until bucketedTable.getNumOfRanges).map { bucketId =>
           val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
           val multiBlockSplit =
             new CarbonMultiBlockSplit(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index 2aa5610..82aeb14 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -50,7 +50,7 @@ object PartitionDropper {
     val bucketInfo = carbonTable.getBucketingInfo(tableName)
     val bucketNumber = bucketInfo match {
       case null => 1
-      case _ => bucketInfo.getNumberOfBuckets
+      case _ => bucketInfo.getNumOfRanges
     }
     val partitionIndex = oldPartitionIds.indexOf(Integer.valueOf(partitionId))
     val targetPartitionId = partitionInfo.getPartitionType match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index 9106cca..0d437f6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -46,7 +46,7 @@ object PartitionSplitter {
      var finalSplitStatus = false
      val bucketNumber = bucketInfo match {
        case null => 1
-       case _ => bucketInfo.getNumberOfBuckets
+       case _ => bucketInfo.getNumOfRanges
      }
      val partitionInfo = carbonTable.getPartitionInfo(tableName)
      val partitioner = PartitionFactory.getPartitioner(partitionInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 2dcff81..6b2e230 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -880,7 +880,8 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", 
"BAD_RECORDS_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", 
"BAD_RECORD_PATH",
       "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS",
-      "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", 
"SKIP_EMPTY_LINE"
+      "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", 
"SKIP_EMPTY_LINE",
+      "SORT_COLUMN_BOUNDS"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4b1d11b..46e24dd 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -433,7 +433,7 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
       val cols = info.getListOfColumns.asScala
       val sortColumn = carbonTable.
         getDimensionByTableName(carbonTable.getTableName).get(0).getColName
-      val numBuckets = info.getNumberOfBuckets
+      val numBuckets = info.getNumOfRanges
       val bucketColumns = cols.flatMap { n =>
         val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
         attrRef match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index e291f41..f28f4d1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -29,6 +29,7 @@ import 
org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import 
org.apache.carbondata.processing.loading.converter.DictionaryCardinalityFinder;
 
@@ -107,6 +108,8 @@ public class CarbonDataLoadConfiguration {
    */
   private short writingCoresCount;
 
+  private SortColumnRangeInfo sortColumnRangeInfo;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -353,4 +356,12 @@ public class CarbonDataLoadConfiguration {
   public void setWritingCoresCount(short writingCoresCount) {
     this.writingCoresCount = writingCoresCount;
   }
+
+  public SortColumnRangeInfo getSortColumnRangeInfo() {
+    return sortColumnRangeInfo;
+  }
+
+  public void setSortColumnRangeInfo(SortColumnRangeInfo sortColumnRangeInfo) {
+    this.sortColumnRangeInfo = sortColumnRangeInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index cf045a4..b8e9062 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -22,23 +22,26 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
 import 
org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcessorStepImpl;
 import 
org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
-import 
org.apache.carbondata.processing.loading.steps.DataConverterProcessorWithBucketingStepImpl;
 import 
org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
 import 
org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
@@ -51,6 +54,8 @@ import org.apache.commons.lang3.StringUtils;
  * It builds the pipe line of steps for loading data to carbon.
  */
 public final class DataLoadProcessBuilder {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
 
   public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, 
String[] storeLocation,
       CarbonIterator[] inputIterators) throws Exception {
@@ -120,7 +125,7 @@ public final class DataLoadProcessBuilder {
     // 2. Converts the data like dictionary or non dictionary or complex 
objects depends on
     // data types and configurations.
     AbstractDataLoadProcessorStep converterProcessorStep =
-        new DataConverterProcessorWithBucketingStepImpl(configuration, 
inputProcessorStep);
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
     // 3. Sorts the data by SortColumn or not
     AbstractDataLoadProcessorStep sortProcessorStep =
         new SortProcessorStepImpl(configuration, converterProcessorStep);
@@ -221,6 +226,8 @@ public final class DataLoadProcessBuilder {
     configuration.setPreFetch(loadModel.isPreFetch());
     configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     
configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
+
+    setSortColumnInfo(carbonTable, loadModel, configuration);
     // For partition loading always use single core as it already runs in 
multiple
     // threads per partition
     if (carbonTable.isHivePartitionTable()) {
@@ -231,4 +238,71 @@ public final class DataLoadProcessBuilder {
     return configuration;
   }
 
+  /**
+   * set sort column info in configuration
+   * @param carbonTable carbon table
+   * @param loadModel load model
+   * @param configuration configuration
+   */
+  private static void setSortColumnInfo(CarbonTable carbonTable, 
CarbonLoadModel loadModel,
+      CarbonDataLoadConfiguration configuration) {
+    List<String> sortCols = 
carbonTable.getSortColumns(carbonTable.getTableName());
+    SortScopeOptions.SortScope sortScope = 
SortScopeOptions.getSortScope(loadModel.getSortScope());
+    if (!SortScopeOptions.SortScope.LOCAL_SORT.equals(sortScope)
+        || sortCols.size() == 0
+        || StringUtils.isBlank(loadModel.getSortColumnsBoundsStr())) {
+      if (!StringUtils.isBlank(loadModel.getSortColumnsBoundsStr())) {
+        LOGGER.warn("sort column bounds will be ignored");
+      }
+
+      configuration.setSortColumnRangeInfo(null);
+      return;
+    }
+    // column index for sort columns
+    int[] sortColIndex = new int[sortCols.size()];
+    boolean[] isSortColNoDict = new boolean[sortCols.size()];
+
+    DataField[] outFields = configuration.getDataFields();
+    int j = 0;
+    boolean columnExist;
+    for (String sortCol : sortCols) {
+      columnExist = false;
+
+      for (int i = 0; !columnExist && i < outFields.length; i++) {
+        if (outFields[i].getColumn().getColName().equalsIgnoreCase(sortCol)) {
+          columnExist = true;
+
+          sortColIndex[j] = i;
+          isSortColNoDict[j] = !outFields[i].hasDictionaryEncoding();
+          j++;
+        }
+      }
+
+      if (!columnExist) {
+        throw new CarbonDataLoadingException("Field " + sortCol + " does not 
exist.");
+      }
+    }
+
+    String[] sortColumnBounds = StringUtils.splitPreserveAllTokens(
+        loadModel.getSortColumnsBoundsStr(),
+        CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_ROW_DELIMITER, -1);
+    for (String bound : sortColumnBounds) {
+      String[] fieldInBounds = StringUtils.splitPreserveAllTokens(bound,
+          CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_FIELD_DELIMITER, -1);
+      if (fieldInBounds.length != sortCols.size()) {
+        String msg = new StringBuilder(
+            "The number of field in bounds should be equal to that in sort 
columns.")
+            .append(" Expected ").append(sortCols.size())
+            .append(", actual 
").append(String.valueOf(fieldInBounds.length)).append(".")
+            .append(" The illegal bound is 
'").append(bound).append("'.").toString();
+        throw new CarbonDataLoadingException(msg);
+      }
+    }
+
+    SortColumnRangeInfo sortColumnRangeInfo = new 
SortColumnRangeInfo(sortColIndex,
+        isSortColNoDict,
+        sortColumnBounds,
+        CarbonLoadOptionConstants.SORT_COLUMN_BOUNDS_FIELD_DELIMITER);
+    configuration.setSortColumnRangeInfo(sortColumnRangeInfo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
index fd3a650..016ff3f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
@@ -31,6 +31,6 @@ public interface RowConverter extends 
DictionaryCardinalityFinder {
   CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
 
   RowConverter createCopyForNewThread();
-
+  FieldConverter[] getFieldConverters();
   void finish();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index c5313cb..208d42f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -250,4 +250,9 @@ public class RowConverterImpl implements RowConverter {
     }
     return cardinality;
   }
+
+  @Override
+  public FieldConverter[] getFieldConverters() {
+    return fieldConverters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 31c5b27..ffc62a1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -187,6 +187,10 @@ public class CarbonLoadModel implements Serializable {
   private String globalSortPartitions;
 
   private boolean isAggLoadRequest;
+  /**
+   * sort columns bounds
+   */
+  private String sortColumnsBoundsStr;
 
   public boolean isAggLoadRequest() {
     return isAggLoadRequest;
@@ -355,6 +359,14 @@ public class CarbonLoadModel implements Serializable {
     this.dictionaryServiceProvider = dictionaryServiceProvider;
   }
 
+  public String getSortColumnsBoundsStr() {
+    return sortColumnsBoundsStr;
+  }
+
+  public void setSortColumnsBoundsStr(String sortColumnsBoundsStr) {
+    this.sortColumnsBoundsStr = sortColumnsBoundsStr;
+  }
+
   /**
    * Get copy with taskNo.
    * Broadcast value is shared in process, so we need to copy it to make sure 
the value in each
@@ -404,6 +416,7 @@ public class CarbonLoadModel implements Serializable {
     copy.batchSortSizeInMb = batchSortSizeInMb;
     copy.isAggLoadRequest = isAggLoadRequest;
     copy.badRecordsLocation = badRecordsLocation;
+    copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
     return copy;
   }
 
@@ -456,6 +469,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.batchSortSizeInMb = batchSortSizeInMb;
     copyObj.badRecordsLocation = badRecordsLocation;
     copyObj.isAggLoadRequest = isAggLoadRequest;
+    copyObj.sortColumnsBoundsStr = sortColumnsBoundsStr;
     return copyObj;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 99684ad..17e8dbe 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -223,6 +223,7 @@ public class CarbonLoadModelBuilder {
 
     carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
     carbonLoadModel.readAndSetLoadMetadataDetails();
+    
carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds"));
   }
 
   private int validateMaxColumns(String[] csvHeaders, String maxColumns)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index bd942ca..5af4859 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -191,6 +191,7 @@ public class LoadOption {
     }
 
     optionsFinal.put("single_pass", String.valueOf(singlePass));
+    optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, 
"sort_column_bounds", ""));
     return optionsFinal;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
index f24d24f..e10faf6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
@@ -19,6 +19,8 @@ package 
org.apache.carbondata.processing.loading.partition.impl;
 
 import java.util.List;
 
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -27,7 +29,8 @@ import 
org.apache.carbondata.processing.loading.partition.Partitioner;
 /**
  * Hash partitioner implementation
  */
-public class HashPartitionerImpl implements Partitioner<Object[]> {
+@InterfaceAudience.Internal
+public class HashPartitionerImpl implements Partitioner<CarbonRow> {
 
   private int numberOfBuckets;
 
@@ -50,10 +53,11 @@ public class HashPartitionerImpl implements 
Partitioner<Object[]> {
     }
   }
 
-  @Override public int getPartition(Object[] objects) {
+  @Override
+  public int getPartition(CarbonRow key) {
     int hashCode = 0;
     for (Hash hash : hashes) {
-      hashCode += hash.getHash(objects);
+      hashCode += hash.getHash(key.getData());
     }
     return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java
new file mode 100644
index 0000000..d59ad02
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RangePartitionerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.partition.impl;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+
+@InterfaceAudience.Internal
+public class RangePartitionerImpl implements Partitioner<CarbonRow> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RangePartitionerImpl.class.getName());
+  private CarbonRow[] rangeBounds;
+  private Comparator<CarbonRow> comparator;
+
+  public RangePartitionerImpl(CarbonRow[] rangeBounds, Comparator<CarbonRow> 
comparator) {
+    this.rangeBounds = rangeBounds;
+    LOGGER.info("Use range partitioner to distribute data to "
+        + (rangeBounds.length + 1) + " ranges.");
+    this.comparator = comparator;
+  }
+
+  /**
+   * learned from spark org.apache.spark.RangePartitioner
+   *
+   * @param key key
+   * @return partitionId
+   */
+  @Override
+  public int getPartition(CarbonRow key) {
+    int partition = 0;
+    if (rangeBounds.length <= 128) {
+      // If we have less than 128 partitions naive search
+      while (partition < rangeBounds.length
+          && comparator.compare(key, rangeBounds[partition]) > 0) {
+        partition += 1;
+      }
+    } else {
+      // binary search. binarySearch either returns the match location or 
-[insertion point]-1
+      partition = Arrays.binarySearch(rangeBounds, 0, rangeBounds.length, key, 
comparator);
+      if (partition < 0) {
+        partition = -partition - 1;
+      }
+      if (partition > rangeBounds.length) {
+        partition = rangeBounds.length;
+      }
+    }
+
+    return partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
new file mode 100644
index 0000000..64b64f5
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.partition.impl;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+/**
+ * comparator for the converted row. The row has not been rearranged as 
3-parted yet.
+ */
+@InterfaceAudience.Internal
+public class RawRowComparator implements Comparator<CarbonRow> {
+  private int[] sortColumnIndices;
+  private boolean[] isSortColumnNoDict;
+
+  public RawRowComparator(int[] sortColumnIndices, boolean[] 
isSortColumnNoDict) {
+    this.sortColumnIndices = sortColumnIndices;
+    this.isSortColumnNoDict = isSortColumnNoDict;
+  }
+
+  @Override
+  public int compare(CarbonRow o1, CarbonRow o2) {
+    int diff = 0;
+    int i = 0;
+    for (int colIdx : sortColumnIndices) {
+      if (isSortColumnNoDict[i]) {
+        byte[] colA = (byte[]) o1.getObject(colIdx);
+        byte[] colB = (byte[]) o2.getObject(colIdx);
+        diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
+        if (diff != 0) {
+          return diff;
+        }
+      } else {
+        int colA = (int) o1.getObject(colIdx);
+        int colB = (int) o2.getObject(colIdx);
+        diff = colA - colB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+      i++;
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
index a8f0282..b74b393 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SorterFactory.java
@@ -25,10 +25,10 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import 
org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterImpl;
-import 
org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
+import 
org.apache.carbondata.processing.loading.sort.impl.ParallelReadMergeSorterWithColumnRangeImpl;
 import 
org.apache.carbondata.processing.loading.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
 import 
org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterImpl;
-import 
org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+import 
org.apache.carbondata.processing.loading.sort.impl.UnsafeParallelReadMergeSorterWithColumnRangeImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class SorterFactory {
@@ -44,15 +44,21 @@ public class SorterFactory {
     Sorter sorter;
     if (offheapsort) {
       if (configuration.getBucketingInfo() != null) {
-        sorter = new 
UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
+        sorter = new UnsafeParallelReadMergeSorterWithColumnRangeImpl(counter,
             configuration.getBucketingInfo());
+      } else if (configuration.getSortColumnRangeInfo() != null) {
+        sorter = new UnsafeParallelReadMergeSorterWithColumnRangeImpl(counter,
+            configuration.getSortColumnRangeInfo());
       } else {
         sorter = new UnsafeParallelReadMergeSorterImpl(counter);
       }
     } else {
       if (configuration.getBucketingInfo() != null) {
-        sorter =
-            new ParallelReadMergeSorterWithBucketingImpl(counter, 
configuration.getBucketingInfo());
+        sorter = new ParallelReadMergeSorterWithColumnRangeImpl(counter,
+            configuration.getBucketingInfo());
+      } else if (configuration.getSortColumnRangeInfo() != null) {
+        sorter = new ParallelReadMergeSorterWithColumnRangeImpl(counter,
+            configuration.getSortColumnRangeInfo());
       } else {
         sorter = new ParallelReadMergeSorterImpl(counter);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index b7452a7..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
-import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-import 
org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
-import 
org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be 
merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data 
separately and write to
- * temp files.
- */
-public class ParallelReadMergeSorterWithBucketingImpl extends 
AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      
LogServiceFactory.getLogService(ParallelReadMergeSorterWithBucketingImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private SortIntermediateFileMerger[] intermediateFileMergers;
-
-  private BucketingInfo bucketingInfo;
-
-  private int sortBufferSize;
-
-  private AtomicLong rowCounter;
-
-  public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter,
-      BucketingInfo bucketingInfo) {
-    this.rowCounter = rowCounter;
-    this.bucketingInfo = bucketingInfo;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-    int buffer = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.SORT_SIZE, 
CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
-    sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
-    if (sortBufferSize < 100) {
-      sortBufferSize = 100;
-    }
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] 
iterators)
-      throws CarbonDataLoadingException {
-    SortDataRows[] sortDataRows = new 
SortDataRows[bucketingInfo.getNumberOfBuckets()];
-    intermediateFileMergers =
-        new SortIntermediateFileMerger[sortDataRows.length];
-    try {
-      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-        SortParameters parameters = sortParameters.getCopy();
-        parameters.setPartitionID(i + "");
-        setTempLocation(parameters);
-        parameters.setBufferSize(sortBufferSize);
-        intermediateFileMergers[i] = new 
SortIntermediateFileMerger(parameters);
-        sortDataRows[i] = new SortDataRows(parameters, 
intermediateFileMergers[i]);
-        sortDataRows[i].initialize();
-      }
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    ExecutorService executorService = 
Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(executorService);
-    final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(new SortIteratorThread(iterators[i], 
sortDataRows, rowCounter,
-            this.threadStatusObserver));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRows, sortParameters);
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server 
", e);
-    }
-    checkError();
-    try {
-      for (int i = 0; i < intermediateFileMergers.length; i++) {
-        intermediateFileMergers[i].finish();
-      }
-    } catch (CarbonDataWriterException e) {
-      throw new CarbonDataLoadingException(e);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-
-    Iterator<CarbonRowBatch>[] batchIterator = new 
Iterator[bucketingInfo.getNumberOfBuckets()];
-    for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-      batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
-    }
-
-    return batchIterator;
-  }
-
-  private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = 
CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        sortParameters.getDatabaseName(), sortParameters.getTableName(),
-        String.valueOf(sortParameters.getTaskNo()), 
sortParameters.getSegmentId(),
-        false, false);
-    // Set the data file location
-    String[] dataFolderLocation = 
CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    return new SingleThreadFinalSortFilesMerger(dataFolderLocation, 
sortParameters.getTableName(),
-            sortParameters);
-  }
-
-  @Override public void close() {
-    for (int i = 0; i < intermediateFileMergers.length; i++) {
-      intermediateFileMergers[i].close();
-    }
-  }
-
-  /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(SortDataRows[] sortDataRows, 
SortParameters parameters)
-      throws CarbonDataLoadingException {
-    if (null == sortDataRows || sortDataRows.length == 0) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
-    try {
-      for (int i = 0; i < sortDataRows.length; i++) {
-        // start sorting
-        sortDataRows[i].startSorting();
-      }
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), 
System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), 
System.currentTimeMillis());
-      return false;
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  private void setTempLocation(SortParameters parameters) {
-    String[] carbonDataDirectoryPath = 
CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        parameters.getDatabaseName(), parameters.getTableName(), 
parameters.getTaskNo(),
-        parameters.getSegmentId(), false, false);
-    String[] tmpLocs = 
CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    parameters.setTempFileLocation(tmpLocs);
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows to @{@link 
SortDataRows}
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private SortDataRows[] sortDataRows;
-
-    private AtomicLong rowCounter;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, 
SortDataRows[] sortDataRows,
-        AtomicLong rowCounter, ThreadStatusObserver observer) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.rowCounter = rowCounter;
-      this.threadStatusObserver = observer;
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
-              synchronized (sortDataRow) {
-                sortDataRow.addRow(row.getData());
-                rowCounter.getAndAdd(1);
-              }
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        this.threadStatusObserver.notifyFailed(e);
-      }
-    }
-
-  }
-
-  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
-    private String partitionId;
-
-    private int batchSize;
-
-    private boolean firstRow = true;
-
-    public MergedDataIterator(String partitionId, int batchSize) {
-      this.partitionId = partitionId;
-      this.batchSize = batchSize;
-    }
-
-    private SingleThreadFinalSortFilesMerger finalMerger;
-
-    @Override public boolean hasNext() {
-      if (firstRow) {
-        firstRow = false;
-        finalMerger = getFinalMerger(partitionId);
-        finalMerger.startFinalMerge();
-      }
-      return finalMerger.hasNext();
-    }
-
-    @Override public CarbonRowBatch next() {
-      int counter = 0;
-      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
-      while (finalMerger.hasNext() && counter < batchSize) {
-        rowBatch.addRow(new CarbonRow(finalMerger.next()));
-        counter++;
-      }
-      return rowBatch;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab9b4cf8/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
new file mode 100644
index 0000000..808952b
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.ColumnRangeInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import 
org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import 
org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be 
merge sorted to get
+ * final merge sort result.
+ * This step is specifically for the data loading with specifying column value 
range, such as
+ * bucketing,sort_column_bounds, it sorts each range of data separately and 
write to temp files.
+ */
+public class ParallelReadMergeSorterWithColumnRangeImpl extends 
AbstractMergeSorter {
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      ParallelReadMergeSorterWithColumnRangeImpl.class.getName());
+
+  private SortParameters originSortParameters;
+
+  private SortIntermediateFileMerger[] intermediateFileMergers;
+
+  private ColumnRangeInfo columnRangeInfo;
+
+  private int sortBufferSize;
+
+  private AtomicLong rowCounter;
+  /**
+   * counters to collect information about rows processed by each range
+   */
+  private List<AtomicLong> insideRowCounterList;
+
+  public ParallelReadMergeSorterWithColumnRangeImpl(AtomicLong rowCounter,
+      ColumnRangeInfo columnRangeInfo) {
+    this.rowCounter = rowCounter;
+    this.columnRangeInfo = columnRangeInfo;
+  }
+
+  @Override
+  public void initialize(SortParameters sortParameters) {
+    this.originSortParameters = sortParameters;
+    int buffer = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.SORT_SIZE, 
CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+    sortBufferSize = buffer / columnRangeInfo.getNumOfRanges();
+    if (sortBufferSize < 100) {
+      sortBufferSize = 100;
+    }
+    this.insideRowCounterList = new 
ArrayList<>(columnRangeInfo.getNumOfRanges());
+    for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+      insideRowCounterList.add(new AtomicLong(0));
+    }
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] 
iterators)
+      throws CarbonDataLoadingException {
+    SortDataRows[] sortDataRows = new 
SortDataRows[columnRangeInfo.getNumOfRanges()];
+    intermediateFileMergers = new 
SortIntermediateFileMerger[columnRangeInfo.getNumOfRanges()];
+    SortParameters[] sortParameterArray = new 
SortParameters[columnRangeInfo.getNumOfRanges()];
+    try {
+      for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+        SortParameters parameters = originSortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        parameters.setRangeId(i);
+        sortParameterArray[i] = parameters;
+        setTempLocation(parameters);
+        parameters.setBufferSize(sortBufferSize);
+        intermediateFileMergers[i] = new 
SortIntermediateFileMerger(parameters);
+        sortDataRows[i] = new SortDataRows(parameters, 
intermediateFileMergers[i]);
+        sortDataRows[i].initialize();
+      }
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    ExecutorService executorService = 
Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      // dispatch rows to sortDataRows by range id
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.execute(new SortIteratorThread(iterators[i], 
sortDataRows, rowCounter,
+            this.insideRowCounterList, this.threadStatusObserver));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, originSortParameters);
+    } catch (Exception e) {
+      checkError();
+      throw new CarbonDataLoadingException("Problem while shutdown the server 
", e);
+    }
+    checkError();
+    try {
+      for (int i = 0; i < intermediateFileMergers.length; i++) {
+        intermediateFileMergers[i].finish();
+      }
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new 
Iterator[columnRangeInfo.getNumOfRanges()];
+    for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+      batchIterator[i] = new MergedDataIterator(sortParameterArray[i], 
batchSize);
+    }
+
+    return batchIterator;
+  }
+
+  private SingleThreadFinalSortFilesMerger getFinalMerger(SortParameters 
sortParameters) {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), 
sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()),
+            sortParameters.getSegmentId() + "", false, false);
+    // Set the data file location
+    String[] dataFolderLocation = 
CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    return new SingleThreadFinalSortFilesMerger(dataFolderLocation, 
sortParameters.getTableName(),
+        sortParameters);
+  }
+
+  @Override public void close() {
+    for (int i = 0; i < intermediateFileMergers.length; i++) {
+      intermediateFileMergers[i].close();
+    }
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(SortDataRows[] sortDataRows, 
SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), 
System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), 
System.currentTimeMillis());
+      return false;
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(),
+            parameters.getSegmentId(), false, false);
+    String[] tmpLocs = 
CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+    parameters.setTempFileLocation(tmpLocs);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link 
SortDataRows}
+   */
+  private static class SortIteratorThread implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortDataRows[] sortDataRows;
+
+    private AtomicLong rowCounter;
+    private List<AtomicLong> insideCounterList;
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, 
SortDataRows[] sortDataRows,
+        AtomicLong rowCounter, List<AtomicLong> insideCounterList,
+        ThreadStatusObserver observer) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.rowCounter = rowCounter;
+      this.insideCounterList = insideCounterList;
+      this.threadStatusObserver = observer;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          while (batch.hasNext()) {
+            CarbonRow row = batch.next();
+            if (row != null) {
+              SortDataRows sortDataRow = sortDataRows[row.getRangeId()];
+              synchronized (sortDataRow) {
+                sortDataRow.addRow(row.getData());
+                insideCounterList.get(row.getRangeId()).getAndIncrement();
+                rowCounter.getAndAdd(1);
+              }
+            }
+          }
+        }
+        LOGGER.info("Rows processed by each range: " + insideCounterList);
+      } catch (Exception e) {
+        LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
+      }
+    }
+
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private SortParameters sortParameters;
+
+    private int batchSize;
+
+    private boolean firstRow = true;
+
+    public MergedDataIterator(SortParameters sortParameters, int batchSize) {
+      this.sortParameters = sortParameters;
+      this.batchSize = batchSize;
+    }
+
+    private SingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(sortParameters);
+        finalMerger.startFinalMerge();
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}

Reply via email to