Repository: incubator-carbondata
Updated Branches:
  refs/heads/12-dev 3b62d25cf -> 5763f8c60


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 3346743..30e03ba 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -52,6 +52,7 @@ case class TableModel(
     tableProperties: Map[String, String],
     dimCols: Seq[Field],
     msrCols: Seq[Field],
+    sortKeyDims: Option[Seq[String]],
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
@@ -357,6 +358,7 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setPrecision(precision)
     columnSchema.setScale(scale)
     columnSchema.setSchemaOrdinal(schemaOrdinal)
+    columnSchema.setSortColumn(false)
     // TODO: Need to fill RowGroupID, converted type
     // & Number of Children after DDL finalization
     columnSchema
@@ -367,7 +369,11 @@ class TableNewProcessor(cm: TableModel) {
     val LOGGER = 
LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
     var allColumns = Seq[ColumnSchema]()
     var index = 0
-    cm.dimCols.foreach(field => {
+    var measureCount = 0
+
+    // Sort columns should be at the begin of all columns
+    cm.sortKeyDims.get.foreach { keyDim =>
+      val field = cm.dimCols.find(keyDim equals _.column).get
       val encoders = new java.util.ArrayList[Encoding]()
       encoders.add(Encoding.DICTIONARY)
       val columnSchema: ColumnSchema = getColumnSchema(
@@ -381,11 +387,33 @@ class TableNewProcessor(cm: TableModel) {
         field.precision,
         field.scale,
         field.schemaOrdinal)
-      allColumns ++= Seq(columnSchema)
+      columnSchema.setSortColumn(true)
+      allColumns :+= columnSchema
       index = index + 1
-      if (field.children.isDefined && field.children.get != null) {
-        columnSchema.setNumberOfChild(field.children.get.size)
-        allColumns ++= getAllChildren(field.children)
+    }
+
+    cm.dimCols.foreach(field => {
+      val sortField = cm.sortKeyDims.get.find(field.column equals _)
+      if (sortField.isEmpty) {
+        val encoders = new java.util.ArrayList[Encoding]()
+        encoders.add(Encoding.DICTIONARY)
+        val columnSchema: ColumnSchema = getColumnSchema(
+          
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+          field.name.getOrElse(field.column),
+          index,
+          isCol = true,
+          encoders,
+          isDimensionCol = true,
+          -1,
+          field.precision,
+          field.scale,
+          field.schemaOrdinal)
+        allColumns :+= columnSchema
+        index = index + 1
+        if (field.children.isDefined && field.children.get != null) {
+          columnSchema.setNumberOfChild(field.children.get.size)
+          allColumns ++= getAllChildren(field.children)
+        }
       }
     })
 
@@ -402,10 +430,9 @@ class TableNewProcessor(cm: TableModel) {
         field.precision,
         field.scale,
         field.schemaOrdinal)
-      val measureCol = columnSchema
-
-      allColumns ++= Seq(measureCol)
+      allColumns :+= columnSchema
       index = index + 1
+      measureCount += 1
     })
 
     // Check if there is any duplicate measures or dimensions.
@@ -426,22 +453,6 @@ class TableNewProcessor(cm: TableModel) {
 
     updateColumnGroupsInFields(cm.columnGroups, allColumns)
 
-    var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    for (column <- allColumns) {
-      if (highCardinalityDims.contains(column.getColumnName)) {
-        newOrderedDims += column
-      } else if (column.isComplex) {
-        complexDims += column
-      } else if (column.isDimensionColumn) {
-        newOrderedDims += column
-      } else {
-        measures += column
-      }
-
-    }
-
     // Setting the boolean value of useInvertedIndex in column schema
     val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
     for (column <- allColumns) {
@@ -456,7 +467,7 @@ class TableNewProcessor(cm: TableModel) {
     }
 
     // Adding dummy measure if no measure is provided
-    if (measures.size < 1) {
+    if (measureCount == 0) {
       val encoders = new java.util.ArrayList[Encoding]()
       val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
         CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
@@ -466,13 +477,10 @@ class TableNewProcessor(cm: TableModel) {
         false,
         -1, 0, 0, schemaOrdinal = -1)
       columnSchema.setInvisible(true)
-      val measureColumn = columnSchema
-      measures += measureColumn
-      allColumns = allColumns ++ measures
+      allColumns :+= columnSchema
     }
     val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
     columnValidator.validateColumns(allColumns)
-    newOrderedDims = newOrderedDims ++ complexDims ++ measures
 
     val tableInfo = new TableInfo()
     val tableSchema = new TableSchema()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
 
b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
index 8588868..b8f0a7c 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
@@ -32,23 +32,15 @@ private class TestCarbonSqlParserStub extends 
CarbonSqlParser {
 
   def updateColumnGroupsInFieldTest(fields: Seq[Field], tableProperties: 
Map[String, String]): Seq[String] = {
 
-     var (dims: Seq[Field], noDictionaryDims: Seq[String]) = 
extractDimColsAndNoDictionaryFields(
-      fields, tableProperties)
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+     var (dims, msrs, noDictionaryDims, sortkey) = 
extractDimAndMsrFields(fields, tableProperties)
 
     updateColumnGroupsInField(tableProperties,
         noDictionaryDims, msrs, dims)
   }
 
-  def extractDimColsAndNoDictionaryFieldsTest(fields: Seq[Field], 
tableProperties: Map[String, String]): (Seq[Field],
-    Seq[String]) = {
-
-    extractDimColsAndNoDictionaryFields(fields, tableProperties)
-  }
-
-  def extractMsrColsFromFieldsTest(fields: Seq[Field], tableProperties: 
Map[String, String]): (Seq[Field]) = {
-
-    extractMsrColsFromFields(fields, tableProperties)
+  def extractDimAndMsrFieldsTest(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], 
Seq[String], Seq[String]) = {
+    extractDimAndMsrFields(fields, tableProperties)
   }
 
 
@@ -199,7 +191,7 @@ class TestCarbonSqlParser extends QueryTest {
     val fields: Seq[Field] = loadAllFields
 
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = 
stub.extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
+    val (dimCols, _, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     // testing col
 
@@ -219,9 +211,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> 
"col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -242,9 +232,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_INCLUDE -> 
"col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -264,9 +252,8 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> 
"col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col4")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields,
+      tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 8)
@@ -287,9 +274,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> 
"col3", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -310,9 +295,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> 
"col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -333,9 +316,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> 
"col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -358,9 +339,7 @@ class TestCarbonSqlParser extends QueryTest {
     )
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 8)
@@ -382,9 +361,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE-> 
"col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col3")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = 
stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -402,10 +379,11 @@ class TestCarbonSqlParser extends QueryTest {
 
   // Testing the extracting of measures
   test("Test-extractMsrColsFromFields") {
-    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> 
"col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> 
"col2",
+      CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (_, msrCols, _, _) = stub.extractDimAndMsrFieldsTest(fields, 
tableProperties)
 
     // testing col
     assert(msrCols.lift(0).get.column.equalsIgnoreCase("col4"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index b848543..b4b462a 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -28,18 +28,25 @@ import 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
 public class SparkRowReadSupportImpl extends 
DictionaryDecodeReadSupport<InternalRow> {
 
+  boolean[] isMeasure;
+
   @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
-    super.initialize(carbonColumns, absoluteTableIdentifier);
     //can initialize and generate schema here.
+    isMeasure = new boolean[carbonColumns.length];
+    dataTypes = new DataType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      isMeasure[i] = !carbonColumns[i].isDimesion();
+      dataTypes[i] = carbonColumns[i].getDataType();
+    }
   }
 
   @Override public InternalRow readRow(Object[] data) {
-    for (int i = 0; i < dictionaries.length; i++) {
+    for (int i = 0; i < isMeasure.length; i++) {
       if (data[i] == null) {
         continue;
       }
-      if (dictionaries[i] == null) {
+      if (isMeasure[i]) {
         if (dataTypes[i].equals(DataType.INT)) {
           data[i] = ((Long)(data[i])).intValue();
         } else if (dataTypes[i].equals(DataType.SHORT)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 2a9c701..27bbc2a 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.vectorreader;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
@@ -30,6 +31,14 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     this.columnVector = columnVector;
   }
 
+  @Override public void putBoolean(int rowId, boolean value) {
+    columnVector.putBoolean(rowId, value);
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    columnVector.putFloat(rowId, value);
+  }
+
   @Override public void putShort(int rowId, short value) {
     columnVector.putShort(rowId, value);
   }
@@ -112,4 +121,8 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   @Override public void reset() {
 //    columnVector.reset();
   }
+
+  @Override public DataType getType() {
+    return columnVector.dataType();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index f8bdcf8..9321706 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -51,18 +51,49 @@ object TableCreator {
     dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
   }
 
-  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-                                                    tableProperties: 
Map[String, String]):
-  (Seq[Field], Seq[String]) = {
+  protected def extractDimAndMsrFields(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], 
Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var msrFields: Seq[Field] = Seq[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
     var noDictionaryDims: Seq[String] = Seq[String]()
     var dictIncludeCols: Seq[String] = Seq[String]()
 
+    // All columns in sortkey should be there in create table cols
+    val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+    var sortKeyDimsTmp: Seq[String] = Seq[String]()
+    if (sortKeyOption.isDefined) {
+      var sortKey = sortKeyOption.get.split(',').map(_.trim)
+      sortKey.foreach { column =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
+          val errormsg = "sort_columns: " + column +
+            " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        } else {
+          val dataType = fields.find(x =>
+            x.column.equalsIgnoreCase(column)).get.dataType.get
+          if (isComplexDimDictionaryExclude(dataType)) {
+            val errormsg = "sort_columns is unsupported for complex datatype 
column: " + column
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+      }
+
+      sortKey.foreach { dimension =>
+        if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase(_))) {
+          fields.foreach { field =>
+            if (field.column.equalsIgnoreCase(dimension)) {
+              sortKeyDimsTmp :+= field.column
+            }
+          }
+        }
+      }
+    }
+
     // All excluded cols should be there in create table cols
     if 
(tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludeCols =
-        
tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
+        
tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
       dictExcludeCols
         .foreach { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
@@ -97,7 +128,7 @@ object TableCreator {
       }
     }
 
-    // include cols should contain exclude cols
+    // include cols should not contain exclude cols
     dictExcludeCols.foreach { dicExcludeCol =>
       if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
         val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " 
+ dicExcludeCol +
@@ -108,11 +139,10 @@ object TableCreator {
 
     // by default consider all String cols as dims and if any dictionary 
exclude is present then
     // add it to noDictionaryDims list. consider all dictionary 
excludes/include cols as dims
-    fields.foreach(field => {
-
+    fields.foreach { field =>
       if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) 
{
-        if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != 
DataType.TIMESTAMP &&
-            DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != 
DataType.DATE) {
+        val dataType = 
DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
+        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE) {
           noDictionaryDims :+= field.column
         }
         dimFields += field
@@ -120,49 +150,30 @@ object TableCreator {
         dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
         dimFields += field
+      } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column))) 
{
+        noDictionaryDims :+= field.column
+        dimFields += field
+      } else {
+        msrFields :+= field
       }
     }
-    )
-
-    (dimFields.toSeq, noDictionaryDims)
-  }
-
-  /**
-   * Extract the Measure Cols fields. By default all non string cols will be 
measures.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractMsrColsFromFields(fields: Seq[Field],
-                                         tableProperties: Map[String, 
String]): Seq[Field] = {
-    var msrFields: Seq[Field] = Seq[Field]()
-    var dictIncludedCols: Array[String] = Array[String]()
-    var dictExcludedCols: Array[String] = Array[String]()
-
-    // get all included cols
-    if 
(tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludedCols =
-        
tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim)
-    }
 
-    // get all excluded cols
-    if 
(tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludedCols =
-        
tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
-    }
-
-    // by default consider all non string cols as msrs. consider all include/ 
exclude cols as dims
-    fields.foreach(field => {
-      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
-          !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-          msrFields :+= field
+    var sortKeyDims = sortKeyDimsTmp
+    if (sortKeyOption.isEmpty) {
+      // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+      dimFields.foreach { field =>
+        if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+          sortKeyDims :+= field.column
         }
       }
-    })
-
-    msrFields
+    }
+    if (sortKeyDims.isEmpty) {
+      // no SORT_COLUMNS
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, "")
+    } else {
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, 
sortKeyDims.mkString(","))
+    }
+    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
   }
 
   def getKey(parentColumnName: Option[String],
@@ -440,27 +451,24 @@ object TableCreator {
   }
 
   def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
-                        , tableName: String, fields: Seq[Field],
-                        partitionCols: Seq[PartitionerField],
-                        bucketFields: Option[BucketFields],
-                        tableProperties: Map[String, String]): TableModel
+      , tableName: String, fields: Seq[Field],
+      partitionCols: Seq[PartitionerField],
+      bucketFields: Option[BucketFields],
+      tableProperties: Map[String, String]): TableModel
   = {
 
-    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = 
extractDimColsAndNoDictionaryFields(
+    fields.zipWithIndex.foreach { x =>
+      x._1.schemaOrdinal = x._2
+    }
+    val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
       fields, tableProperties)
     if (dims.isEmpty) {
-      throw new MalformedCarbonCommandException(s"Table ${
-        dbName.getOrElse(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-      }.$tableName"
-        +
-        " can not be created without key columns. Please " +
-        "use DICTIONARY_INCLUDE or " +
-        "DICTIONARY_EXCLUDE to set at least one key " +
-        "column " +
+      throw new MalformedCarbonCommandException(
+        s"Table 
${dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName " +
+        "can not be created without key columns. Please use DICTIONARY_INCLUDE 
or " +
+        "DICTIONARY_EXCLUDE to set at least one key column " +
         "if all specified columns are numeric types")
     }
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
 
     // column properties
     val colProps = extractColumnProperties(fields, tableProperties)
@@ -474,18 +482,20 @@ object TableCreator {
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
 
-    TableModel(ifNotExistPresent,
+    TableModel(
+      ifNotExistPresent,
       dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
       dbName,
       tableName,
       tableProperties,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => 
addParent(f))),
       msrs.map(f => normalizeType(f)),
+      Option(sortKeyDims),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
       Some(colProps),
-      bucketFields)
+      bucketFields: Option[BucketFields])
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 0bd3e45..407ac2f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -76,6 +76,10 @@ public class CarbonDataLoadConfiguration {
 
   private DictionaryCardinalityFinder cardinalityFinder;
 
+  private int numberOfSortColumns;
+
+  private int numberOfNoDictSortColumns;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -121,6 +125,22 @@ public class CarbonDataLoadConfiguration {
     return dimCount;
   }
 
+  public void setNumberOfSortColumns(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  public int getNumberOfSortColumns() {
+    return this.numberOfSortColumns;
+  }
+
+  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+    this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return this.numberOfNoDictSortColumns;
+  }
+
   public int getComplexDimensionCount() {
     int dimCount = 0;
     for (int i = 0; i < dataFields.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 4ebb2fb..1932888 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -195,6 +195,8 @@ public final class DataLoadProcessBuilder {
     configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
     configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
     configuration.setPreFetch(loadModel.isPreFetch());
+    configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
+    
configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
 
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index 9e4b50d..3accb0b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.carbondata.processing.newflow.converter.impl;
 
-import java.nio.charset.Charset;
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -48,23 +46,24 @@ public class NonDictionaryFieldConverterImpl implements 
FieldConverter {
     this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
-  @Override
-  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     String dimensionValue = row.getString(index);
     if (dimensionValue == null || dimensionValue.equals(nullformat)) {
-      dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-    }
-    if (dataType != DataType.STRING) {
-      if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, 
dataType)) {
-        if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && 
isEmptyBadRecord)) {
+      row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
+    } else {
+      try {
+        row.update(DataTypeUtil
+            .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, 
dataType), index);
+      } catch (Throwable ex) {
+        if (dimensionValue.length() != 0 || isEmptyBadRecord) {
           logHolder.setReason(
               "The value " + " \"" + dimensionValue + "\"" + " with column 
name " + column
                   .getColName() + " and column data type " + dataType + " is 
not a valid "
                   + dataType + " type.");
+        } else {
+          row.update(new byte[0], index);
         }
       }
     }
-    
row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
-        index);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index ad96578..a0e4ef1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -85,7 +85,8 @@ public class ParallelReadMergeSorterImpl implements Sorter {
             sortParameters.getDimColCount(),
             sortParameters.getComplexDimColCount(), 
sortParameters.getMeasureColCount(),
             sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
-            sortParameters.getNoDictionaryDimnesionColumn());
+            sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.getNoDictionarySortColumn());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index e3049d2..430bf1f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -139,7 +139,8 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
         new SingleThreadFinalSortFilesMerger(dataFolderLocation, 
sortParameters.getTableName(),
             sortParameters.getDimColCount(), 
sortParameters.getComplexDimColCount(),
             sortParameters.getMeasureColCount(), 
sortParameters.getNoDictionaryCount(),
-            sortParameters.getAggType(), 
sortParameters.getNoDictionaryDimnesionColumn());
+            sortParameters.getAggType(), 
sortParameters.getNoDictionaryDimnesionColumn(),
+            this.sortParameters.getNoDictionarySortColumn());
     return finalMerger;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index e468028..61c6cca 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -34,6 +34,8 @@ public class UnsafeCarbonRowPage {
 
   private boolean[] noDictionaryDimensionMapping;
 
+  private boolean[] noDictionarySortColumnMapping;
+
   private int dimensionSize;
 
   private int measureSize;
@@ -52,9 +54,11 @@ public class UnsafeCarbonRowPage {
 
   private boolean saveToDisk;
 
-  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, int 
dimensionSize,
-      int measureSize, char[] aggType, MemoryBlock memoryBlock, boolean 
saveToDisk) {
+  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int 
measureSize, char[] aggType,
+      MemoryBlock memoryBlock, boolean saveToDisk) {
     this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
     this.dimensionSize = dimensionSize;
     this.measureSize = measureSize;
     this.aggType = aggType;
@@ -324,4 +328,7 @@ public class UnsafeCarbonRowPage {
     return noDictionaryDimensionMapping;
   }
 
+  public boolean[] getNoDictionarySortColumnMapping() {
+    return noDictionarySortColumnMapping;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 9907509..1dc980f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -108,6 +108,7 @@ public class UnsafeSortDataRows {
   public void initialize() throws CarbonSortKeyAndGroupByException {
     MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 
1024);
     this.rowPage = new 
UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+        parameters.getNoDictionarySortColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
         parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
         !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
@@ -171,6 +172,7 @@ public class UnsafeSortDataRows {
             MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSizeInMB * 
1024 * 1024);
             boolean saveToDisk = 
!UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
             rowPage = new 
UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+                parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + 
parameters.getComplexDimColCount(),
                 parameters.getMeasureColCount(), parameters.getAggType(), 
memoryBlock, saveToDisk);
             bytesAdded += rowPage.addRow(rowBatch[i]);
@@ -198,12 +200,12 @@ public class UnsafeSortDataRows {
     if (this.rowPage.getUsedSize() > 0) {
       TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
           new UnsafeIntSortDataFormat(rowPage));
-      if (parameters.getNoDictionaryCount() > 0) {
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
         timSort.sort(rowPage.getBuffer(), 0, 
rowPage.getBuffer().getActualSize(),
             new UnsafeRowComparator(rowPage));
       } else {
         timSort.sort(rowPage.getBuffer(), 0, 
rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), 
rowPage));
+            new UnsafeRowComparatorForNormalDIms(rowPage));
       }
       unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
     } else {
@@ -288,12 +290,13 @@ public class UnsafeSortDataRows {
         long startTime = System.currentTimeMillis();
         TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
             new UnsafeIntSortDataFormat(page));
-        if (parameters.getNoDictionaryCount() > 0) {
+        // if sort_columns is not none, sort by sort_columns
+        if (parameters.getNumberOfNoDictSortColumns() > 0) {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
               new UnsafeRowComparator(page));
         } else {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new 
UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), page));
+              new UnsafeRowComparatorForNormalDIms(page));
         }
         if (rowPage.isSaveToDisk()) {
           // create a new file every time

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
index e61a284..476b8ac 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -27,14 +27,14 @@ import 
org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonR
 public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
 
   /**
-   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary 
dimensions.
+   * mapping of dictionary and no dictionary of sort_columns.
    */
-  private boolean[] noDictionaryColMaping;
+  private boolean[] noDictionarySortColumnMaping;
 
   private Object baseObject;
 
   public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
-    this.noDictionaryColMaping = rowPage.getNoDictionaryDimensionMapping();
+    this.noDictionarySortColumnMaping = 
rowPage.getNoDictionarySortColumnMapping();
     this.baseObject = rowPage.getDataBlock().getBaseObject();
   }
 
@@ -47,7 +47,7 @@ public class UnsafeRowComparator implements 
Comparator<UnsafeCarbonRow> {
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObject, rowA + sizeA);
         sizeA += 2;
@@ -89,7 +89,7 @@ public class UnsafeRowComparator implements 
Comparator<UnsafeCarbonRow> {
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObjectL, rowA + 
sizeA);
         sizeA += 2;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
index 7448aee..4fd245f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -27,11 +27,11 @@ public class UnsafeRowComparatorForNormalDIms implements 
Comparator<UnsafeCarbon
 
   private Object baseObject;
 
-  private int dimCount;
+  private int numberOfSortColumns;
 
-  public UnsafeRowComparatorForNormalDIms(int dimCount, UnsafeCarbonRowPage 
rowPage) {
+  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
     this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.dimCount = dimCount;
+    this.numberOfSortColumns = 
rowPage.getNoDictionarySortColumnMapping().length;
   }
 
   /**
@@ -43,7 +43,7 @@ public class UnsafeRowComparatorForNormalDIms implements 
Comparator<UnsafeCarbon
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (int i = 0; i < dimCount; i++) {
+    for (int i = 0; i < numberOfSortColumns; i++) {
       int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObject, rowA + sizeA);
       sizeA += 4;
       int dimFieldB = CarbonUnsafe.unsafe.getInt(baseObject, rowB + sizeB);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index ed9e0a6..397de63 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -45,13 +45,13 @@ public class UnsafeFinalMergePageHolder implements 
SortTempChunkHolder {
   private int columnSize;
 
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger 
merger,
-      boolean[] noDictMapping, int columnSize) {
+      boolean[] noDictSortColumnMapping, int columnSize) {
     this.actualSize = merger.getEntryCount();
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
     this.rowPages = merger.getUnsafeCarbonRowPages();
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + 
actualSize);
-    this.comparator = new NewRowComparator(noDictMapping);
+    this.comparator = new NewRowComparator(noDictSortColumnMapping);
     this.columnSize = columnSize;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
index f491623..048f4f8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -41,11 +41,12 @@ public class UnsafeInmemoryHolder implements 
SortTempChunkHolder {
 
   private int columnSize;
 
-  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize) {
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+      int numberOfSortColumns) {
     this.actualSize = rowPage.getBuffer().getActualSize();
     this.rowPage = rowPage;
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + 
actualSize);
-    this.comparator = new 
NewRowComparator(rowPage.getNoDictionaryDimensionMapping());
+    this.comparator = new 
NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
     this.columnSize = columnSize;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 60f259e..6d04ebf 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -153,7 +153,7 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
     this.aggType = parameters.getAggType();
     this.isNoDictionaryDimensionColumn = 
parameters.getNoDictionaryDimnesionColumn();
     this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
-    comparator = new NewRowComparator(isNoDictionaryDimensionColumn);
+    comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
     initialize();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index b98a072..ab59395 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -126,7 +126,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
 
         SortTempChunkHolder sortTempFileChunkHolder = new 
UnsafeInmemoryHolder(rowPage,
             parameters.getDimColCount() + parameters.getComplexDimColCount() + 
parameters
-                .getMeasureColCount());
+                .getMeasureColCount(), parameters.getNumberOfSortColumns());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -137,7 +137,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
       for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeFinalMergePageHolder(merger, 
parameters.getNoDictionaryDimnesionColumn(),
+            new UnsafeFinalMergePageHolder(merger, 
parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + 
parameters.getComplexDimColCount() + parameters
                     .getMeasureColCount());
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 0ac2d5c..dbddc1d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -251,7 +251,8 @@ public class IntermediateFileMerger implements 
Callable<Void> {
           new SortTempFileChunkHolder(tempFile, 
mergerParameters.getDimColCount(),
               mergerParameters.getComplexDimColCount(), 
mergerParameters.getMeasureColCount(),
               mergerParameters.getFileBufferSize(), 
mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getAggType(), 
mergerParameters.getNoDictionaryDimnesionColumn());
+              mergerParameters.getAggType(), 
mergerParameters.getNoDictionaryDimnesionColumn(),
+              mergerParameters.getNoDictionarySortColumn());
 
       // initialize
       sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
index dd9358c..247251e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
@@ -24,15 +24,15 @@ import 
org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 public class NewRowComparator implements Comparator<Object[]> {
 
   /**
-   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary 
dimensions.
+   * mapping of dictionary dimensions and no dictionary of sort_column.
    */
-  private boolean[] noDictionaryColMaping;
+  private boolean[] noDictionarySortColumnMaping;
 
   /**
-   * @param noDictionaryColMaping
+   * @param noDictionarySortColumnMaping
    */
-  public NewRowComparator(boolean[] noDictionaryColMaping) {
-    this.noDictionaryColMaping = noDictionaryColMaping;
+  public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
+    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
   }
 
   /**
@@ -43,7 +43,7 @@ public class NewRowComparator implements Comparator<Object[]> 
{
 
     int index = 0;
 
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
 
       if (isNoDictionary) {
         byte[] byteArr1 = (byte[]) rowA[index];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
index d913b32..241882e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
@@ -26,15 +26,15 @@ public class NewRowComparatorForNormalDims implements 
Comparator<Object[]> {
   /**
    * dimension count
    */
-  private int dimensionCount;
+  private int numberOfSortColumns;
 
   /**
    * RowComparatorForNormalDims Constructor
    *
-   * @param dimensionCount
+   * @param numberOfSortColumns
    */
-  public NewRowComparatorForNormalDims(int dimensionCount) {
-    this.dimensionCount = dimensionCount;
+  public NewRowComparatorForNormalDims(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
   }
 
   /**
@@ -45,7 +45,7 @@ public class NewRowComparatorForNormalDims implements 
Comparator<Object[]> {
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
 
-    for (int i = 0; i < dimensionCount; i++) {
+    for (int i = 0; i < numberOfSortColumns; i++) {
 
       int dimFieldA = (int)rowA[i];
       int dimFieldB = (int)rowB[i];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
index c282f52..2584048 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
@@ -33,15 +33,15 @@ public class RowComparator implements Comparator<Object[]> {
   /**
    * noDictionaryColMaping mapping of dictionary dimensions and no dictionary 
dimensions.
    */
-  private boolean[] noDictionaryColMaping;
+  private boolean[] noDictionarySortColumnMaping;
 
   /**
-   * @param noDictionaryColMaping
+   * @param noDictionarySortColumnMaping
    * @param noDictionaryCount
    */
-  public RowComparator(boolean[] noDictionaryColMaping, int noDictionaryCount) 
{
+  public RowComparator(boolean[] noDictionarySortColumnMaping, int 
noDictionaryCount) {
     this.noDictionaryCount = noDictionaryCount;
-    this.noDictionaryColMaping = noDictionaryColMaping;
+    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
   }
 
   /**
@@ -53,7 +53,7 @@ public class RowComparator implements Comparator<Object[]> {
     int normalIndex = 0;
     int noDictionaryindex = 0;
 
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
 
       if (isNoDictionary) {
         byte[] byteArr1 = (byte[]) 
rowA[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
index ceaf5c6..8d914ea 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
@@ -28,15 +28,15 @@ public class RowComparatorForNormalDims implements 
Comparator<Object[]> {
   /**
    * dimension count
    */
-  private int dimensionCount;
+  private int numberOfSortColumns;
 
   /**
    * RowComparatorForNormalDims Constructor
    *
-   * @param dimensionCount
+   * @param numberOfSortColumns
    */
-  public RowComparatorForNormalDims(int dimensionCount) {
-    this.dimensionCount = dimensionCount;
+  public RowComparatorForNormalDims(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
   }
 
   /**
@@ -47,7 +47,7 @@ public class RowComparatorForNormalDims implements 
Comparator<Object[]> {
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
 
-    for (int i = 0; i < dimensionCount; i++) {
+    for (int i = 0; i < numberOfSortColumns; i++) {
 
       int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
       int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 9b5a850..949d8f9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -191,11 +191,10 @@ public class SortDataRows {
       Object[][] toSort;
       toSort = new Object[entryCount][];
       System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
-
-      if (parameters.getNoDictionaryCount() > 0) {
-        Arrays.sort(toSort, new 
NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
+        Arrays.sort(toSort, new 
NewRowComparator(parameters.getNoDictionarySortColumn()));
       } else {
-        Arrays.sort(toSort, new 
NewRowComparatorForNormalDims(parameters.getDimColCount()));
+        Arrays.sort(toSort, new 
NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
       }
       recordHolderList = toSort;
 
@@ -385,12 +384,12 @@ public class SortDataRows {
     @Override public Void call() throws Exception {
       try {
         long startTime = System.currentTimeMillis();
-        if (parameters.getNoDictionaryCount() > 0) {
+        if (parameters.getNumberOfNoDictSortColumns() > 0) {
           Arrays.sort(recordHolderArray,
-              new 
NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
+              new NewRowComparator(parameters.getNoDictionarySortColumn()));
         } else {
           Arrays.sort(recordHolderArray,
-              new NewRowComparatorForNormalDims(parameters.getDimColCount()));
+              new 
NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
         }
 
         // create a new file every time

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index d42dc32..40e933d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -112,6 +112,12 @@ public class SortParameters {
    */
   private boolean[] noDictionaryDimnesionColumn;
 
+  private boolean[] noDictionarySortColumn;
+
+  private int numberOfSortColumns;
+
+  private int numberOfNoDictSortColumns;
+
   private int numberOfCores;
 
   public SortParameters getCopy() {
@@ -137,6 +143,9 @@ public class SortParameters {
     parameters.segmentId = segmentId;
     parameters.taskNo = taskNo;
     parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.noDictionarySortColumn = noDictionarySortColumn;
+    parameters.numberOfSortColumns = numberOfSortColumns;
+    parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
     parameters.numberOfCores = numberOfCores;
     return parameters;
   }
@@ -317,6 +326,30 @@ public class SortParameters {
     this.numberOfCores = numberOfCores;
   }
 
+  public int getNumberOfSortColumns() {
+    return numberOfSortColumns;
+  }
+
+  public void setNumberOfSortColumns(int numberOfSortColumns) {
+    this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount);
+  }
+
+  public boolean[] getNoDictionarySortColumn() {
+    return noDictionarySortColumn;
+  }
+
+  public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) {
+    this.noDictionarySortColumn = noDictionarySortColumn;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return numberOfNoDictSortColumns;
+  }
+
+  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+    this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, 
noDictionaryCount);
+  }
+
   public static SortParameters 
createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
     CarbonTableIdentifier tableIdentifier =
@@ -334,6 +367,16 @@ public class SortParameters {
     parameters.setComplexDimColCount(configuration.getComplexDimensionCount());
     parameters.setNoDictionaryDimnesionColumn(
         
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+    parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
+    
parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
+    if (parameters.getNumberOfSortColumns() == 
parameters.getNoDictionaryDimnesionColumn().length) {
+      
parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
+    } else {
+      boolean[] noDictionarySortColumnTemp = new 
boolean[parameters.getNumberOfSortColumns()];
+      System.arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0,
+          noDictionarySortColumnTemp, 0, parameters.getNumberOfSortColumns());
+      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
+    }
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index ae01404..2a4346d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -133,6 +133,11 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
   private boolean[] isNoDictionaryDimensionColumn;
 
   /**
+   * to store whether sort column is of dictionary type or not
+   */
+  private boolean[] isNoDictionarySortColumn;
+
+  /**
    * Constructor to initialize
    *
    * @param tempFile
@@ -146,7 +151,7 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
    */
   public SortTempFileChunkHolder(File tempFile, int dimensionCount, int 
complexDimensionCount,
       int measureCount, int fileBufferSize, int noDictionaryCount, char[] 
aggType,
-      boolean[] isNoDictionaryDimensionColumn) {
+      boolean[] isNoDictionaryDimensionColumn, boolean[] 
isNoDictionarySortColumn) {
     // set temp file
     this.tempFile = tempFile;
 
@@ -160,7 +165,9 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
     this.fileBufferSize = fileBufferSize;
     this.executorService = Executors.newFixedThreadPool(1);
     this.aggType = aggType;
+
     this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
+    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
   }
 
   /**
@@ -409,7 +416,7 @@ public class SortTempFileChunkHolder implements 
Comparable<SortTempFileChunkHold
     int[] rightMdkArray = (int[]) other.returnRow[0];
     byte[][] leftNonDictArray = (byte[][]) returnRow[1];
     byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
-    for (boolean isNoDictionary : isNoDictionaryDimensionColumn) {
+    for (boolean isNoDictionary : isNoDictionarySortColumn) {
       if (isNoDictionary) {
         diff = UnsafeComparer.INSTANCE
             .compareTo(leftNonDictArray[noDictionaryIndex], 
rightNonDictArray[noDictionaryIndex]);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2affa03..fe3579b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -709,13 +709,15 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
     int i = 0;
     int dictionaryColumnCount = -1;
     int noDictionaryColumnCount = -1;
+    boolean isSortColumn = false;
     for (i = 0; i < dimensionType.length; i++) {
+      isSortColumn = i < segmentProperties.getNumberOfSortColumns();
       if (dimensionType[i]) {
         dictionaryColumnCount++;
         if (colGrpModel.isColumnar(dictionaryColumnCount)) {
           submit.add(executorService.submit(
-              new BlockSortThread(i, 
dataHolders[dictionaryColumnCount].getData(), true,
-                  isUseInvertedIndex[i])));
+              new BlockSortThread(i, 
dataHolders[dictionaryColumnCount].getData(), isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn)));
         } else {
           submit.add(
               executorService.submit(new 
ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -723,7 +725,7 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
       } else {
         submit.add(executorService.submit(
             new BlockSortThread(i, 
noDictionaryColumnsData[++noDictionaryColumnCount], false, true,
-                true, isUseInvertedIndex[i])));
+                isSortColumn, isUseInvertedIndex[i] & isSortColumn)));
       }
     }
     for (int k = 0; k < complexColCount; k++) {
@@ -747,7 +749,42 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
     }
     byte[] composedNonDictStartKey = null;
     byte[] composedNonDictEndKey = null;
-    if (noDictionaryStartKey != null) {
+
+    int numberOfDictSortColumns = 
segmentProperties.getNumberOfDictSortColumns();
+    // generate start/end key by sort_columns
+    if (numberOfDictSortColumns > 0) {
+      // if sort_columns contain dictionary columns
+      int[] keySize = columnarSplitter.getBlockKeySize();
+      if (keySize.length > numberOfDictSortColumns) {
+        int newMdkLength = 0;
+        for (int index = 0; index < numberOfDictSortColumns; index++) {
+          newMdkLength += keySize[index];
+        }
+        byte[] newStartKeyOfSortKey = new byte[newMdkLength];
+        byte[] newEndKeyOfSortKey = new byte[newMdkLength];
+        System.arraycopy(startkeyLocal, 0, newStartKeyOfSortKey, 0, 
newMdkLength);
+        System.arraycopy(endKeyLocal, 0, newEndKeyOfSortKey, 0, newMdkLength);
+        startkeyLocal = newStartKeyOfSortKey;
+        endKeyLocal = newEndKeyOfSortKey;
+      }
+    } else {
+      startkeyLocal = new byte[0];
+      endKeyLocal = new byte[0];
+    }
+
+    int numberOfNoDictSortColumns = 
segmentProperties.getNumberOfNoDictSortColumns();
+    if (numberOfNoDictSortColumns > 0) {
+      // if sort_columns contain no-dictionary columns
+      if (noDictionaryStartKey.length > numberOfNoDictSortColumns) {
+        byte[][] newNoDictionaryStartKey = new 
byte[numberOfNoDictSortColumns][];
+        byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
+        System.arraycopy(noDictionaryStartKey, 0, newNoDictionaryStartKey, 0,
+            numberOfNoDictSortColumns);
+        System
+            .arraycopy(noDictionaryEndKey, 0, newNoDictionaryEndKey, 0, 
numberOfNoDictSortColumns);
+        noDictionaryStartKey = newNoDictionaryStartKey;
+        noDictionaryEndKey = newNoDictionaryEndKey;
+      }
       composedNonDictStartKey =
           
NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryStartKey);
       composedNonDictEndKey =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 68f9bd5..f8454f1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -101,9 +101,11 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
    */
   private boolean[] isNoDictionaryColumn;
 
+  private boolean[] isNoDictionarySortColumn;
+
   public SingleThreadFinalSortFilesMerger(String tempFileLocation, String 
tableName,
       int dimensionCount, int complexDimensionCount, int measureCount, int 
noDictionaryCount,
-      char[] aggType, boolean[] isNoDictionaryColumn) {
+      char[] aggType, boolean[] isNoDictionaryColumn, boolean[] 
isNoDictionarySortColumn) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.dimensionCount = dimensionCount;
@@ -112,6 +114,7 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
     this.aggType = aggType;
     this.noDictionaryCount = noDictionaryCount;
     this.isNoDictionaryColumn = isNoDictionaryColumn;
+    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
   }
 
   /**
@@ -180,7 +183,8 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
           // create chunk holder
           SortTempFileChunkHolder sortTempFileChunkHolder =
               new SortTempFileChunkHolder(tempFile, dimensionCount, 
complexDimensionCount,
-                  measureCount, fileBufferSize, noDictionaryCount, aggType, 
isNoDictionaryColumn);
+                  measureCount, fileBufferSize, noDictionaryCount, aggType, 
isNoDictionaryColumn,
+                  isNoDictionarySortColumn);
 
           // initialize
           sortTempFileChunkHolder.initialize();

Reply via email to