[CARBONDATA-2606]Fix Complex array Pushdown and block auto merge compaction

1.Check for if Complex Column contains ArrayType at n levels and add parent to 
projection if contains array.
2.Block Auto merge compaction for table containing complex datatype columns.
3.Fix Decimal Datatype scale and precision with two level struct type
4.Fix Dictionary Include for ComplexDataType
- If other complex columns other than first complex column is given in 
dictionary include, then its insertion fails.
5.Fix BadRecord and dateformat for Complex primitive type-DATE

This closes #2535


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/beaf20b5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/beaf20b5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/beaf20b5

Branch: refs/heads/branch-1.4
Commit: beaf20b5bf101bd9cf86badeaa9bac94ab062650
Parents: 0076957
Author: Indhumathi27 <indhumathi...@gmail.com>
Authored: Sat Jul 21 16:16:21 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../chunk/store/ColumnPageWrapper.java          |   2 +
 .../complexType/TestComplexDataType.scala       | 114 +++++++++++++++++++
 .../apache/spark/util/SparkTypeConverter.scala  |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  20 ++--
 .../sql/CarbonDatasourceHadoopRelation.scala    |   2 +-
 .../management/CarbonLoadDataCommand.scala      |  24 ++--
 .../processing/datatypes/ArrayDataType.java     |   6 +-
 .../processing/datatypes/PrimitiveDataType.java |  30 ++++-
 .../processing/datatypes/StructDataType.java    |  14 ++-
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +-
 .../store/CarbonFactDataHandlerModel.java       |   4 +-
 11 files changed, 199 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index f95ff11..180b3a2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -125,6 +125,8 @@ public class ColumnPageWrapper implements 
DimensionColumnPage {
         return out;
       } else if (srcDataType == DataTypes.BYTE_ARRAY) {
         return columnPage.getBytes(rowId);
+      }  else if (srcDataType == DataTypes.DOUBLE) {
+        return ByteUtil.toBytes(columnPage.getDouble(rowId));
       } else {
         throw new RuntimeException("unsupported type: " + targetDataType);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index 6470648..e2884d8 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -19,6 +19,9 @@ import org.apache.carbondata.core.util.CarbonProperties
 
 class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
 
+  val badRecordAction = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION)
+
   override def beforeAll(): Unit = {
     sql("DROP TABLE IF EXISTS table1")
     sql("DROP TABLE IF EXISTS test")
@@ -27,6 +30,13 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
   override def afterAll(): Unit = {
     sql("DROP TABLE IF EXISTS table1")
     sql("DROP TABLE IF EXISTS test")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, 
badRecordAction)
   }
 
   test("test Projection PushDown for Struct - Integer type") {
@@ -883,6 +893,110 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
       "create table table1 (person struct<height:double>) stored by 
'carbondata'")
     sql("insert into table1 values('1000000000')")
     checkExistence(sql("select * from table1"),true,"1.0E9")
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (person struct<height:double>) stored by 
'carbondata'")
+    sql("insert into table1 values('12345678912')")
+    checkExistence(sql("select * from table1"),true,"1.2345678912E10")
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (person struct<b:array<double>>) stored by 
'carbondata'")
+    sql("insert into table1 values('10000000:2000000000:2900000000')")
+    checkExistence(sql("select * from table1"),true,"2.9E9")
   }
 
+  test("test block compaction - auto merge") {
+    sql("DROP TABLE IF EXISTS table1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql(
+      "create table table1 (roll int,person 
Struct<detail:int,age:string,height:double>) stored " +
+      "by 'carbondata'")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/Struct.csv' into table table1 options('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/Struct.csv' into table table1 options('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/Struct.csv' into table table1 options('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/Struct.csv' into table table1 options('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    checkExistence(sql("show segments for table table1"),false, "Compacted")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
+  test("decimal with two level struct type") {
+    sql("DROP TABLE IF EXISTS test")
+    sql(
+      "create table test(id int,a struct<c:struct<d:decimal(20,10)>>) stored 
by 'carbondata' " +
+      "tblproperties('dictionary_include'='a')")
+    checkExistence(sql("desc test"),true,"struct<c:struct<d:decimal(20,10)>>")
+    checkExistence(sql("describe formatted 
test"),true,"struct<c:struct<d:decimal(20,10)>>")
+    sql("insert into test values(1,'3999.999')")
+    checkExistence(sql("select * from test"),true,"3999.9990000000")
+  }
+
+  test("test dictionary include for second struct and array column") {
+    sql("DROP TABLE IF EXISTS test")
+    sql(
+      "create table test(id int,a struct<b:int,c:int>, d struct<e:int,f:int>, 
d1 struct<e1:int," +
+      "f1:int>) stored by 'carbondata' 
tblproperties('dictionary_include'='d1')")
+    sql("insert into test values(1,'2$3','4$5','6$7')")
+    checkAnswer(sql("select * from 
test"),Seq(Row(1,Row(2,3),Row(4,5),Row(6,7))))
+    sql("DROP TABLE IF EXISTS test")
+    sql(
+      "create table test(a array<int>, b array<int>) stored by 'carbondata' 
tblproperties" +
+      "('dictionary_include'='b')")
+    sql("insert into test values(1,2) ")
+    checkAnswer(sql("select b[0] from test"),Seq(Row(2)))
+  }
+
+  test("date with struct and array") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(a struct<b:date>) stored by 'carbondata'")
+    val exception1 = intercept[Exception] {
+      sql("insert into test select 'a' ")
+    }
+    assert(exception1.getMessage
+      .contains(
+        "Data load failed due to bad record: The value with column name a.b 
and column data type " +
+        "DATE is not a valid DATE type.Please enable bad record logger to know 
the detail reason."))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(a array<date>) stored by 'carbondata'")
+    val exception2 = intercept[Exception] {
+      sql("insert into test select 'a' ")
+    }
+    assert(exception2.getMessage
+      .contains(
+        "Data load failed due to bad record: The value with column name a.val 
and column data type " +
+        "DATE is not a valid DATE type.Please enable bad record logger to know 
the detail reason."))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        "MM-dd-yyyy")
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(a struct<d1:date,d2:date>) stored by 'carbondata'")
+    sql("insert into test values ('02-18-2012$12-9-2016')")
+    checkAnswer(sql("select * from test "), 
Row(Row(java.sql.Date.valueOf("2012-02-18"),java.sql.Date.valueOf("2016-12-09"))))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
index 8a9277c..f806cda 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
@@ -128,7 +128,10 @@ private[spark] object SparkTypeConverter {
       case "struct" => s"${
         childDim.getColName.substring(dimName.length + 1)
       }:struct<${ getStructChildren(table, childDim.getColName) }>"
-      case dType => s"${ childDim.getColName.substring(dimName.length + 1) 
}:${ dType }"
+      case dType => s"${
+        childDim.getColName
+          .substring(dimName.length + 1)
+      }:${ addDecimalScaleAndPrecision(childDim, dType) }"
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b723968..6e322b3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -583,13 +583,19 @@ object CarbonDataRDDFactory {
         if (carbonTable.isHivePartitionTable) {
           carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
         }
-        val compactedSegments = new util.ArrayList[String]()
-        handleSegmentMerging(sqlContext,
-          carbonLoadModel,
-          carbonTable,
-          compactedSegments,
-          operationContext)
-        carbonLoadModel.setMergedSegmentIds(compactedSegments)
+        // Block compaction for table containing complex datatype
+        if (carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
+          .exists(m => m.getDataType.isComplexType)) {
+          LOGGER.warn("Compaction is skipped as table contains complex 
columns")
+        } else {
+          val compactedSegments = new util.ArrayList[String]()
+          handleSegmentMerging(sqlContext,
+            carbonLoadModel,
+            carbonTable,
+            compactedSegments,
+            operationContext)
+          carbonLoadModel.setMergedSegmentIds(compactedSegments)
+        }
       } catch {
         case e: Exception =>
           throw new Exception(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 8b6567f..3ce8c8c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -97,7 +97,7 @@ case class CarbonDatasourceHadoopRelation(
           breakable({
             while (ifGetArrayItemExists.containsChild != null) {
               if 
(ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
-                arrayTypeExists = 
s.childSchema.toString().contains("ArrayType")
+                arrayTypeExists = 
ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
                 break
               }
               if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) 
{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 3bf9595..d50aea1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -823,15 +823,21 @@ case class CarbonLoadDataCommand(
     }
     try {
       carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-      val compactedSegments = new util.ArrayList[String]()
-      // Trigger auto compaction
-      CarbonDataRDDFactory.handleSegmentMerging(
-        sparkSession.sqlContext,
-        carbonLoadModel,
-        table,
-        compactedSegments,
-        operationContext)
-      carbonLoadModel.setMergedSegmentIds(compactedSegments)
+      // Block compaction for table containing complex datatype
+      if (table.getTableInfo.getFactTable.getListOfColumns.asScala
+        .exists(m => m.getDataType.isComplexType)) {
+        LOGGER.warn("Compaction is skipped as table contains complex columns")
+      } else {
+        val compactedSegments = new util.ArrayList[String]()
+        // Trigger auto compaction
+        CarbonDataRDDFactory.handleSegmentMerging(
+          sparkSession.sqlContext,
+          carbonLoadModel,
+          table,
+          compactedSegments,
+          operationContext)
+        carbonLoadModel.setMergedSegmentIds(compactedSegments)
+      }
     } catch {
       case e: Exception =>
         throw new Exception(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 05754eb..60972e8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -172,8 +172,10 @@ public class ArrayDataType implements 
GenericDataType<ArrayObject> {
 
   @Override
   public void fillCardinality(List<Integer> dimCardWithComplex) {
-    dimCardWithComplex.add(0);
-    children.fillCardinality(dimCardWithComplex);
+    if (children.getIsColumnDictionary()) {
+      dimCardWithComplex.add(0);
+      children.fillCardinality(dimCardWithComplex);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index c738bac..a6d8e9c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -21,6 +21,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -173,7 +175,8 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
       if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
           || carbonColumn.getDataType() == DataTypes.DATE) {
         dictionaryGenerator = new 
DirectDictionary(DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(carbonDimension.getDataType()));
+            .getDirectDictionaryGenerator(carbonDimension.getDataType(),
+                getDateFormat(carbonDimension)));
         isDirectDictionary = true;
       } else if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
         CacheProvider cacheProvider = CacheProvider.getInstance();
@@ -204,6 +207,25 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
     }
   }
 
+  /**
+   * get dateformat
+   * @param carbonDimension
+   * @return
+   */
+  private String getDateFormat(CarbonDimension carbonDimension) {
+    String format;
+    String dateFormat = null;
+    if (this.carbonDimension.getDataType() == DataTypes.DATE) {
+      dateFormat = carbonDimension.getDateFormat();
+    }
+    if (dateFormat != null && !dateFormat.trim().isEmpty()) {
+      format = dateFormat;
+    } else {
+      format = CarbonUtil.getFormatFromProperty(dataType);
+    }
+    return format;
+  }
+
   private boolean isDictionaryDimension(CarbonDimension carbonDimension) {
     if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
       return true;
@@ -326,6 +348,10 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
             byte[] value = null;
             if (isDirectDictionary) {
               int surrogateKey;
+              if (!(input instanceof Long)) {
+                SimpleDateFormat parser = new 
SimpleDateFormat(getDateFormat(carbonDimension));
+                parser.parse(parsedValue);
+              }
               // If the input is a long value then this means that logical 
type was provided by
               // the user using AvroCarbonWriter. In this case directly 
generate surrogate key
               // using dictionaryGenerator.
@@ -389,6 +415,8 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
           updateNullValue(dataOutputStream, logHolder);
         } catch (CarbonDataLoadingException e) {
           throw e;
+        } catch (ParseException ex) {
+          updateNullValue(dataOutputStream, logHolder);
         } catch (Throwable ex) {
           // TODO have to implemented the Bad Records LogHolder.
           // Same like NonDictionaryFieldConverterImpl.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 29acf95..af95de6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -178,9 +178,17 @@ public class StructDataType implements 
GenericDataType<StructObject> {
 
   @Override
   public void fillCardinality(List<Integer> dimCardWithComplex) {
-    dimCardWithComplex.add(0);
-    for (int i = 0; i < children.size(); i++) {
-      children.get(i).fillCardinality(dimCardWithComplex);
+    boolean isDictionaryColumn = false;
+    for (GenericDataType child : children) {
+      if (child.getIsColumnDictionary()) {
+        isDictionaryColumn = true;
+      }
+    }
+    if (isDictionaryColumn) {
+      dimCardWithComplex.add(0);
+      for (int i = 0; i < children.size(); i++) {
+        children.get(i).fillCardinality(dimCardWithComplex);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 94511cf..7151b47 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -165,7 +165,9 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
         List<GenericDataType> primitiveTypes = new 
ArrayList<GenericDataType>();
         complexDataType.getAllPrimitiveChildren(primitiveTypes);
         for (GenericDataType eachPrimitive : primitiveTypes) {
-          eachPrimitive.setSurrogateIndex(surrIndex++);
+          if (eachPrimitive.getIsColumnDictionary()) {
+            eachPrimitive.setSurrogateIndex(surrIndex++);
+          }
         }
       } else {
         surrIndex++;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/beaf20b5/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 767b409..7201305 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -248,7 +248,9 @@ public class CarbonFactDataHandlerModel {
       List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
       complexDataType.getValue().getAllPrimitiveChildren(primitiveTypes);
       for (GenericDataType eachPrimitive : primitiveTypes) {
-        eachPrimitive.setSurrogateIndex(surrIndex++);
+        if (eachPrimitive.getIsColumnDictionary()) {
+          eachPrimitive.setSurrogateIndex(surrIndex++);
+        }
       }
     }
 

Reply via email to