Support adding local dictionary configuration in create table statement and 
show the configs in describe formatted table

What changes were proposed in this pull request?
In this PR, in order to support local dictionary,

create table changes are made to support local dictionary configurations as 
table properties
show local dictionary properties in describe formatted command based on whether 
the local dictionary enabled or disabled.
Highlights:
basically we will have four properties

LOCAL_DICT_ENABLE => whether to enable or disable local dictionary
LOCAL_DICT_THRESHOLD => threshold property for the column to generate local 
dictionary
LOCAL_DICT_INCLUDE => columns for which local dictionary needs to be generated
LOCAL_DICT_EXCLUDE => columns for which local dictionary should not be generated

This closes#2375


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/be20fefb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/be20fefb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/be20fefb

Branch: refs/heads/carbonstore
Commit: be20fefbe0a01a501a567683ecc872e08f3fc001
Parents: ca466d9
Author: akashrn5 <akashnilu...@gmail.com>
Authored: Wed Jun 6 20:33:39 2018 +0530
Committer: kumarvishal09 <kumarvishal1...@gmail.com>
Committed: Tue Jun 19 21:08:27 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  31 +++++
 .../ThriftWrapperSchemaConverterImpl.java       |   5 +
 .../core/metadata/schema/table/CarbonTable.java |  68 +++++++++
 .../schema/table/column/ColumnSchema.java       |  19 +++
 .../apache/carbondata/core/util/CarbonUtil.java | 108 ++++++++++++++
 .../describeTable/TestDescribeTable.scala       |   4 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 139 ++++++++++++++++++-
 .../command/carbonTableSchemaCommon.scala       |  13 +-
 .../table/CarbonDescribeFormattedCommand.scala  |  38 ++++-
 10 files changed, 417 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 355bcb6..5f06d08 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -910,6 +910,37 @@ public final class CarbonCommonConstants {
   public static final String COLUMN_GROUPS = "column_groups";
   public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
   public static final String DICTIONARY_INCLUDE = "dictionary_include";
+
+  /**
+   * Table property to enable or disable local dictionary generation
+   */
+  public static final String LOCAL_DICTIONARY_ENABLE = 
"local_dictionary_enable";
+
+  /**
+   * default value for local dictionary generation
+   */
+  public static final String LOCAL_DICTIONARY_ENABLE_DEFAULT = "true";
+
+  /**
+   * Threshold value for local dictionary
+   */
+  public static final String LOCAL_DICTIONARY_THRESHOLD = 
"local_dictionary_threshold";
+
+  /**
+   * default value for local dictionary
+   */
+  public static final String LOCAL_DICTIONARY_THRESHOLD_DEFAULT = "1000";
+
+  /**
+   * Table property to specify the columns for which local dictionary needs to 
be generated.
+   */
+  public static final String LOCAL_DICTIONARY_INCLUDE = 
"local_dictionary_include";
+
+  /**
+   * Table property to specify the columns for which local dictionary should 
not be to be generated.
+   */
+  public static final String LOCAL_DICTIONARY_EXCLUDE = 
"local_dictionary_exclude";
+
   /**
    * key for dictionary path
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index f03b997..12f5fc3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import 
org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Thrift schema to carbon schema converter and vice versa
@@ -594,6 +595,10 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
         .getTable_columns()) {
       
listOfColumns.add(fromExternalToWrapperColumnSchema(externalColumnSchema));
     }
+    if (null != externalTableSchema.tableProperties) {
+      CarbonUtil
+          .setLocalDictColumnsToWrapperSchema(listOfColumns, 
externalTableSchema.tableProperties);
+    }
     wrapperTableSchema.setListOfColumns(listOfColumns);
     wrapperTableSchema.setSchemaEvolution(
         
fromExternalToWrapperSchemaEvolution(externalTableSchema.getSchema_evolution()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f48ada0..b7bef28 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -151,6 +151,16 @@ public class CarbonTable implements Serializable {
   private boolean hasDataMapSchema;
 
   /**
+   * is local dictionary generation enabled for the table
+   */
+  private boolean isLocalDictionaryEnabled;
+
+  /**
+   * local dictionary generation threshold
+   */
+  private int localDictionaryThreshold;
+
+  /**
    * The boolean field which points if the data written for Non Transactional 
Table
    * or Transactional Table.
    * transactional table means carbon will provide transactional support when 
user doing data
@@ -468,6 +478,37 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * is local dictionary enabled for the table
+   * @return
+   */
+  public boolean isLocalDictionaryEnabled() {
+    return isLocalDictionaryEnabled;
+  }
+
+  /**
+   * set whether local dictionary enabled or not
+   * @param localDictionaryEnabled
+   */
+  public void setLocalDictionaryEnabled(boolean localDictionaryEnabled) {
+    isLocalDictionaryEnabled = localDictionaryEnabled;
+  }
+
+  /**
+   * @return local dictionary generation threshold
+   */
+  public int getLocalDictionaryThreshold() {
+    return localDictionaryThreshold;
+  }
+
+  /**
+   * set the local dictionary generation threshold
+   * @param localDictionaryThreshold
+   */
+  public void setLocalDictionaryThreshold(int localDictionaryThreshold) {
+    this.localDictionaryThreshold = localDictionaryThreshold;
+  }
+
+  /**
    * build table unique name
    * all should call this method to build table unique name
    * @param databaseName
@@ -1045,5 +1086,32 @@ public class CarbonTable implements Serializable {
     }
     table.hasDataMapSchema =
         null != tableInfo.getDataMapSchemaList() && 
tableInfo.getDataMapSchemaList().size() > 0;
+    setLocalDictInfo(table, tableInfo);
+  }
+
+  /**
+   * This method sets whether the local dictionary is enabled or not, and the 
local dictionary
+   * threshold, if not defined default value are considered.
+   * @param table
+   * @param tableInfo
+   */
+  private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) 
{
+    String isLocalDictionaryEnabled = 
tableInfo.getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
+    String localDictionaryThreshold = 
tableInfo.getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD);
+    if (null != isLocalDictionaryEnabled) {
+      
table.setLocalDictionaryEnabled(Boolean.parseBoolean(isLocalDictionaryEnabled));
+      if (null != localDictionaryThreshold) {
+        
table.setLocalDictionaryThreshold(Integer.parseInt(localDictionaryThreshold));
+      } else {
+        table.setLocalDictionaryThreshold(
+            
Integer.parseInt(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT));
+      }
+    } else {
+      // in case of old tables, local dictionary enable property will not be 
present in
+      // tableProperties, so disable the local dictionary generation
+      table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index fb4d8e3..786e873 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -138,6 +138,25 @@ public class ColumnSchema implements Serializable, 
Writable {
   private String timeSeriesFunction = "";
 
   /**
+   * set whether the column is local dictionary column or not.
+   */
+  private boolean isLocalDictColumn = false;
+
+  /**
+   * @return isLocalDictColumn
+   */
+  public boolean isLocalDictColumn() {
+    return isLocalDictColumn;
+  }
+
+  /**
+   * @param localDictColumn whether column is local dictionary column
+   */
+  public void setLocalDictColumn(boolean localDictColumn) {
+    isLocalDictColumn = localDictColumn;
+  }
+
+  /**
    * @return the columnName
    */
   public String getColumnName() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 836b193..2f34163 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3004,5 +3004,113 @@ public final class CarbonUtil {
     }
     return blockId;
   }
+
+  /**
+   * sets the local dictionary columns to wrapper schema, if the table property
+   * local_dictionary_include is defined, then those columns will be set as 
local dictionary
+   * columns, if not, all the no dictionary string datatype columns are set as 
local dictionary
+   * columns.
+   * Handling for complexTypes::
+   *    Since the column structure will be flat
+   *    if the parent column is configured as local Dictionary column, then it 
gets the child column
+   *    count and then sets the primitive child column as local dictionary 
column if it is string
+   *    datatype column
+   * Handling for both localDictionary Include and exclude columns:
+   * There will be basically four scenarios which are
+   * -------------------------------------------------------
+   * | Local_Dictionary_include | Local_Dictionary_Exclude |
+   * -------------------------------------------------------
+   * |   Not Defined            |     Not Defined          |
+   * |   Not Defined            |      Defined             |
+   * |   Defined                |     Not Defined          |
+   * |   Defined                |      Defined             |
+   * -------------------------------------------------------
+   * 1. when the both local dictionary include and exclude is not defined, 
then set all the no
+   * dictionary string datatype columns as local dictionary generate columns
+   * 2. set all the no dictionary string datatype columns as local dictionary 
columns except the
+   * columns present in local dictionary exclude
+   * 3. & 4. when local dictionary include is defined, no need to check 
dictionary exclude columns
+   * configured or not, we just need to set only the columns present in local 
dictionary include as
+   * local dictionary columns
+   *
+   * @param columns
+   * @param mainTableProperties
+   */
+  public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> 
columns,
+      Map<String, String> mainTableProperties) {
+    String isLocalDictEnabledForMainTable =
+        mainTableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
+    String localDictIncludeColumnsOfMainTable =
+        
mainTableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE);
+    String localDictExcludeColumnsOfMainTable =
+        
mainTableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE);
+    String[] listOfDictionaryIncludeColumns = null;
+    String[] listOfDictionaryExcludeColumns = null;
+    if (null != isLocalDictEnabledForMainTable && Boolean
+        .parseBoolean(isLocalDictEnabledForMainTable)) {
+      int childColumnCount = 0;
+      for (ColumnSchema column : columns) {
+        // for complex type columns, user gives the parent column as local 
dictionary column and
+        // only the string primitive type child column will be set as local 
dictionary column in the
+        // schema
+        if (childColumnCount > 0) {
+          if (column.getDataType().equals(DataTypes.STRING)) {
+            column.setLocalDictColumn(true);
+            childColumnCount -= 1;
+          } else {
+            childColumnCount -= 1;
+          }
+        }
+        // if complex column is defined in local dictionary include column, 
then get the child
+        // columns and set the string datatype child type as local dictionary 
column
+        if (column.getNumberOfChild() > 0 && null != 
localDictIncludeColumnsOfMainTable) {
+          listOfDictionaryIncludeColumns = 
localDictIncludeColumnsOfMainTable.split(",");
+          for (String dictColumn : listOfDictionaryIncludeColumns) {
+            if (dictColumn.trim().equalsIgnoreCase(column.getColumnName())) {
+              childColumnCount = column.getNumberOfChild();
+            }
+          }
+        }
+        if (null == localDictIncludeColumnsOfMainTable) {
+          // if local dictionary exclude columns is not defined, then set all 
the no dictionary
+          // string datatype column
+          if (null == localDictExcludeColumnsOfMainTable) {
+            // column should be no dictionary string datatype column
+            if (column.isDimensionColumn() && 
column.getDataType().equals(DataTypes.STRING)
+                && !column.hasEncoding(Encoding.DICTIONARY)) {
+              column.setLocalDictColumn(true);
+            }
+            // if local dictionary exclude columns is defined, then set for 
all no dictionary string
+            // datatype columns except excluded columns
+          } else {
+            if (column.isDimensionColumn() && 
column.getDataType().equals(DataTypes.STRING)
+                && !column.hasEncoding(Encoding.DICTIONARY)) {
+              listOfDictionaryExcludeColumns = 
localDictExcludeColumnsOfMainTable.split(",");
+              for (String excludeDictColumn : listOfDictionaryExcludeColumns) {
+                if 
(!excludeDictColumn.trim().equalsIgnoreCase(column.getColumnName())) {
+                  column.setLocalDictColumn(true);
+                }
+              }
+            }
+          }
+        } else {
+          // if local dict columns alre not configured, set for all no 
dictionary string datatype
+          // column
+          if (column.isDimensionColumn() && 
column.getDataType().equals(DataTypes.STRING) && !column
+              .hasEncoding(Encoding.DICTIONARY) && 
localDictIncludeColumnsOfMainTable.toLowerCase()
+              .contains(column.getColumnName().toLowerCase())) {
+            if (null == listOfDictionaryIncludeColumns) {
+              listOfDictionaryIncludeColumns = 
localDictIncludeColumnsOfMainTable.split(",");
+            }
+            for (String dictColumn : listOfDictionaryIncludeColumns) {
+              if (dictColumn.trim().equalsIgnoreCase(column.getColumnName())) {
+                column.setLocalDictColumn(true);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
index 1e333ee..5598457 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
@@ -51,10 +51,10 @@ class TestDescribeTable extends QueryTest with 
BeforeAndAfterAll {
   test("test describe formatted table desc1") {
 
     val resultCol = Seq("", "", "##Detailed Column property", "##Detailed 
Table Information", "ADAPTIVE", "CARBON Store Path", "Comment", "Database 
Name", "Last Update Time",
-    "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Table Data 
Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", 
"dec2col4")
+    "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Local 
Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table 
Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
     val resultRow: Seq[Row] = resultCol map(propName => Row(f"$propName%-36s"))
     checkAnswer(sql("desc formatted DESC1").select("col_name"), resultRow)
-    assert(sql("desc formatted desc1").count() == 20)
+    assert(sql("desc formatted desc1").count() == 22)
   }
 
   test("test describe formatted for partition table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index b43ae3f..1ccbf6a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -44,7 +44,7 @@ import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
 import org.apache.carbondata.core.metadata.ColumnIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType => 
CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema, DataMapSchemaStorageProvider}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
ColumnSchema}
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.processing.exception.DataLoadingException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 7d28790..65ff76d 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, Map}
 import scala.language.implicitConversions
+import scala.util.Try
 import scala.util.matching.Regex
 
 import org.apache.hadoop.hive.ql.lib.Node
@@ -43,7 +44,7 @@ import 
org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
DataTypeConverterUtil}
 
 /**
  * TODO remove the duplicate code and add the common methods to common class.
@@ -292,6 +293,83 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
         s"${CarbonCommonConstants.COLUMN_GROUPS} is deprecated")
     }
 
+    // validate the local dictionary property if defined
+    if 
(tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined) {
+      
Try(tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).toBoolean) 
match {
+        case scala.util.Success(value) =>
+        case scala.util.Failure(ex) =>
+          tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+            CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+      }
+    } else {
+      // if LOCAL_DICTIONARY_ENABLE is not defined, consider the default value 
which is true
+      tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+        CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+    }
+
+    // validate the local dictionary threshold property if defined
+    if 
(tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD).isDefined)
 {
+      // if any invalid value is configured for LOCAL_DICTIONARY_THRESHOLD, 
then default value
+      // will be
+      // considered which is 1000
+      
Try(tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD).toInt) 
match {
+        case scala.util.Success(value) =>
+          if (value <= 0) {
+            
tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+              CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+          }
+        case scala.util.Failure(ex) =>
+          LOGGER
+            .debug(
+              "invalid value is configured for local_dictionary_threshold, 
considering the defaut" +
+              " " +
+              "value")
+          tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+            CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+      }
+    }
+
+    // validate the local dictionary columns defined, this we will validated 
if the local dictionary
+    // is enabled, else it is not validated
+    if 
(!(tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined 
&&
+          tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).trim
+            .equalsIgnoreCase("false"))) {
+      var localDictIncludeColumns: Seq[String] = Seq[String]()
+      var localDictExcludeColumns: Seq[String] = Seq[String]()
+      val isLocalDictIncludeDefined = tableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
+        .isDefined
+      val isLocalDictExcludeDefined = tableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
+        .isDefined
+      if (isLocalDictIncludeDefined) {
+        localDictIncludeColumns =
+          
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).split(",").map(_.trim)
+        // validate all the local dictionary include columns
+        validateLocalDictionaryColumns(fields, tableProperties, 
localDictIncludeColumns)
+      }
+      if (isLocalDictExcludeDefined) {
+        localDictExcludeColumns =
+          
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).split(",").map(_.trim)
+        // validate all the local dictionary exclude columns
+        validateLocalDictionaryColumns(fields, tableProperties, 
localDictExcludeColumns)
+      }
+      // validate if both local dictionary include and exclude contains same 
column
+      if (isLocalDictIncludeDefined && isLocalDictExcludeDefined) {
+        val localDictIncludeCols = 
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
+        val localDictExcludeCols = 
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
+        if (List(localDictIncludeCols, 
localDictExcludeCols).mkString(",").split(",")
+              .distinct.length !=
+            List(localDictIncludeCols, 
localDictExcludeCols).mkString(",").split(",")
+              .length) {
+          val errMsg =
+            "Column ambiguity as duplicate columns present in 
LOCAL_DICTIONARY_INCLUDE and " +
+            "LOCAL_DICTIONARY_INCLUDE.Duplicate columns are not allowed."
+          throw new MalformedCarbonCommandException(errMsg)
+        }
+      }
+    }
+
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, 
tableProperties)
     // get partitionInfo
@@ -322,6 +400,65 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   }
 
   /**
+   * This method validates the local dictionary configured columns
+   *
+   * @param fields
+   * @param tableProperties
+   */
+  private def validateLocalDictionaryColumns(fields: Seq[Field],
+      tableProperties: Map[String, String], localDictColumns: Seq[String]): 
Unit = {
+    var dictIncludeColumns: Seq[String] = Seq[String]()
+
+    // check if the duplicate columns are specified in table schema
+    if (localDictColumns.distinct.lengthCompare(localDictColumns.size) != 0) {
+      val a = localDictColumns.diff(localDictColumns.distinct).distinct
+      val errMsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE contains 
Duplicate Columns " +
+                   a.mkString("(", ",", ")") +
+                   ". Please check create table statement."
+      throw new MalformedCarbonCommandException(errMsg)
+    }
+
+    // check if the column specified exists in table schema
+    localDictColumns.foreach { distCol =>
+      if (!fields.exists(x => x.column.equalsIgnoreCase(distCol.trim))) {
+        val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE 
column: " + distCol.trim +
+                       " does not exist in table. Please check create table 
statement."
+        throw new MalformedCarbonCommandException(errormsg)
+      }
+    }
+
+    // check if column is other than string datatype
+    localDictColumns.foreach { dictColm =>
+      if (fields
+        .exists(x => x.column.equalsIgnoreCase(dictColm) &&
+                     !x.dataType.get.equalsIgnoreCase("STRING") &&
+                     !x.dataType.get.equalsIgnoreCase("STRUCT") &&
+                     !x.dataType.get.equalsIgnoreCase("ARRAY"))) {
+        val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE 
column: " +
+                       dictColm.trim +
+                       " is not a String datatype column. 
LOCAL_DICTIONARY_COLUMN should be no " +
+                       "dictionary string datatype column.Please check create 
table statement."
+        throw new MalformedCarbonCommandException(errormsg)
+      }
+    }
+    // check if the same column is present in both dictionary include and 
local dictionary columns
+    // configuration
+    if 
(tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludeColumns =
+        
tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(",").map(_.trim)
+      localDictColumns.foreach { distCol =>
+        if (dictIncludeColumns.exists(x => x.equalsIgnoreCase(distCol.trim))) {
+          val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE 
column: " +
+                         distCol.trim +
+                         " specified in Dictionary include. Local Dictionary 
will not be " +
+                         "generated for Dictionary include. Please check 
create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        }
+      }
+    }
+  }
+
+  /**
    * Extract the column groups configuration from table properties.
    * Based on this Row groups of fields will be determined.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index aa40a1f..d48db21 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.util.CarbonException
 
-import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -40,7 +39,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, 
ParentColumnTableRelation}
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType
@@ -246,7 +245,6 @@ class AlterTableColumnSchemaGenerator(
       allColumns ++= Seq(columnSchema)
       newCols ++= Seq(columnSchema)
     })
-
     allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
     alterTableModel.msrCols.foreach(field => {
       val encoders = new java.util.ArrayList[Encoding]()
@@ -294,7 +292,9 @@ class AlterTableColumnSchemaGenerator(
     alterTableModel.tableProperties.foreach {
       x => val value = tablePropertiesMap.get(x._1)
         if (null != value) {
-          tablePropertiesMap.put(x._1, value + "," + x._2)
+          if (value != x._2) {
+            tablePropertiesMap.put(x._1, value + "," + x._2)
+          }
         } else {
           tablePropertiesMap.put(x._1, x._2)
         }
@@ -550,6 +550,11 @@ class TableNewProcessor(cm: TableModel) {
       }
     }
 
+    // check whether the column is a local dictionary column and set in column 
schema
+    if (null != cm.tableProperties) {
+      CarbonUtil
+        .setLocalDictColumnsToWrapperSchema(allColumns.asJava, 
cm.tableProperties.asJava)
+    }
     cm.msrCols.foreach { field =>
       // if aggregate function is defined in case of preaggregate and agg 
function is sum or avg
       // then it can be stored as measure

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 617f5e8..c6bd567 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -30,8 +30,7 @@ import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 
 private[sql] case class CarbonDescribeFormattedCommand(
     child: SparkPlan,
@@ -111,6 +110,41 @@ private[sql] case class CarbonDescribeFormattedCommand(
       .LOAD_SORT_SCOPE_DEFAULT)))
     val isStreaming = tblProps.asScala.getOrElse("streaming", "false")
     results ++= Seq(("Streaming", isStreaming, ""))
+    val isLocalDictEnabled = tblProps.asScala
+      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+          CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+    results ++= Seq(("Local Dictionary Enabled", isLocalDictEnabled, ""))
+    // if local dictionary is enabled, then only show other properties of 
local dictionary
+    if (isLocalDictEnabled.toBoolean) {
+      val localDictThreshold = tblProps.asScala
+        .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+          CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+      results ++= Seq(("Local Dictionary Threshold", localDictThreshold, ""))
+      if (tblProps.asScala
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).isDefined) {
+        val allLocalDictColumns = 
tblProps.asScala(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
+          .split(",")
+        results ++= Seq(("Local Dictionary Include", 
getDictColumnString(allLocalDictColumns), ""))
+      }
+      if (tblProps.asScala
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) {
+        val allLocalDictColumns = 
tblProps.asScala(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
+          .split(",")
+        results ++= Seq(("Local Dictionary Exclude", 
getDictColumnString(allLocalDictColumns), ""))
+      }
+    }
+
+    /**
+     * return the string which has all comma separated columns
+     * @param localDictColumns
+     * @return
+     */
+    def getDictColumnString(localDictColumns: Array[String]): String = {
+      val dictColumns: StringBuilder = new StringBuilder
+      localDictColumns.foreach(column => 
dictColumns.append(column).append(","))
+      dictColumns.toString().patch(dictColumns.toString().lastIndexOf(","), 
"", 1)
+    }
+
 
     // show table level compaction options
     if 
(tblProps.containsKey(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)) {

Reply via email to