Repository: carbondata
Updated Branches:
  refs/heads/master e7103397d -> a0350e100


[CARBONDATA-2647] [CARBONDATA-2648] Add support for COLUMN_META_CACHE and 
CACHE_LEVEL in create table and alter table properties

Things done as part of this PR

Support for configuring COLUMN_META_CACHE in create and alter table set 
properties DDL.
Support for configuring CACHE_LEVEL in create and alter table set properties 
DDL.
Describe formatted display support for COLUMN_META_CACHE and CACHE_LEVEL
  Any interfaces changed?
Create Table Syntax
CREATE TABLE [dbName].tableName (col1 String, col2 String, col3 int,…) STORED 
BY ‘carbondata’ TBLPROPERTIES (‘COLUMN_META_CACHE’=’col1,col2,…’, 
'CACHE_LEVEL'='BLOCKLET')

Alter Table set properties Syntax
ALTER TABLE [dbName].tableName SET TBLPROPERTIES 
(‘COLUMN_META_CACHE’=’col1,col2,…’, 'CACHE_LEVEL'='BLOCKLET')

This closs #2418


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a0350e10
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a0350e10
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a0350e10

Branch: refs/heads/master
Commit: a0350e100e262fef85409b3b4fe2a488d688f706
Parents: e710339
Author: manishgupta88 <tomanishgupt...@gmail.com>
Authored: Tue Jun 26 20:11:14 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 28 16:37:43 2018 +0530

----------------------------------------------------------------------
 .../dictionary/ManageDictionaryAndBTree.java    |  18 ++
 .../core/constants/CarbonCommonConstants.java   |  12 +
 .../core/metadata/schema/table/CarbonTable.java |  38 +++
 ...ithColumnMetCacheAndCacheLevelProperty.scala | 168 ++++++++++++
 ...ithColumnMetCacheAndCacheLevelProperty.scala | 155 +++++++++++
 .../describeTable/TestDescribeTable.scala       |   4 +-
 .../carbondata/spark/util/CommonUtil.scala      |  91 ++++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  31 ++-
 .../table/CarbonDescribeFormattedCommand.scala  |   8 +
 .../org/apache/spark/util/AlterTableUtil.scala  | 263 +++++++++++++++++--
 10 files changed, 766 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
 
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a7d6027..b54fb14 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -149,4 +149,22 @@ public class ManageDictionaryAndBTree {
     }
   }
 
+  /**
+   * This method will remove the BTree instances from LRU cache for all the 
segments
+   *
+   * @param carbonTable
+   */
+  public static void invalidateBTreeCache(CarbonTable carbonTable) {
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    if (loadMetadataDetails.length > 0) {
+      String[] segments = new String[loadMetadataDetails.length];
+      int loadCounter = 0;
+      for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+        segments[loadCounter++] = loadMetadataDetail.getLoadName();
+      }
+      invalidateBTreeCache(carbonTable.getAbsoluteTableIdentifier(), segments);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 118ff28..da40862 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1839,6 +1839,18 @@ public final class CarbonCommonConstants {
    *  the node minimum load data default value
    */
   public static final int CARBON_LOAD_MIN_SIZE_DEFAULT = 256;
+  /**
+   * property to be specified for caching min/max of required columns
+   */
+  public static final String COLUMN_META_CACHE = "column_meta_cache";
+  /**
+   * property to be specified for caching level (Block/Blocket)
+   */
+  public static final String CACHE_LEVEL = "cache_level";
+  /**
+   * default value for cache level
+   */
+  public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";
 
   private CarbonCommonConstants() {
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 68bd749..710c7af 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1165,4 +1165,42 @@ public class CarbonTable implements Serializable {
       table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
     }
   }
+
+  /**
+   * Method to get the list of cached columns of the table
+   *
+   * @param tableName
+   * @return
+   */
+  public List<String> getCachedColumns(String tableName) {
+    List<String> cachedColsList = new ArrayList<>(tableDimensionsMap.size());
+    String cacheColumns =
+        
tableInfo.getFactTable().getTableProperties().get(CarbonCommonConstants.COLUMN_META_CACHE);
+    if (null != cacheColumns && !cacheColumns.isEmpty()) {
+      List<CarbonDimension> carbonDimensions = 
tableDimensionsMap.get(tableName);
+      List<CarbonMeasure> carbonMeasures = tableMeasuresMap.get(tableName);
+      String[] cachedCols = cacheColumns.split(",");
+      for (String column : cachedCols) {
+        boolean found = false;
+        // this will avoid adding the columns which have been dropped from the 
table
+        for (CarbonDimension dimension : carbonDimensions) {
+          if (dimension.getColName().equals(column)) {
+            cachedColsList.add(column);
+            found = true;
+            break;
+          }
+        }
+        // if column is not a dimension then check in measures
+        if (!found) {
+          for (CarbonMeasure measure : carbonMeasures) {
+            if (measure.getColName().equals(column)) {
+              cachedColsList.add(column);
+              break;
+            }
+          }
+        }
+      }
+    }
+    return cachedColsList;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
new file mode 100644
index 0000000..dbe9c75
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.alterTable
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+/**
+ * test class for validating alter table set properties with 
alter_column_meta_cache and
+ * cache_level properties
+ */
+class TestAlterTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest 
with BeforeAndAfterAll {
+
+  private def isExpectedValueValid(dbName: String,
+      tableName: String,
+      key: String,
+      expectedValue: String): Boolean = {
+    val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), 
tableName)(sqlContext.sparkSession)
+    val value = 
carbonTable.getTableInfo.getFactTable.getTableProperties.get(key)
+    expectedValue.equals(value)
+  }
+
+  private def dropTable = {
+    sql("drop table if exists alter_column_meta_cache")
+    sql("drop table if exists cache_level")
+  }
+
+  override def beforeAll {
+    // drop table
+    dropTable
+    // create table
+    sql("create table alter_column_meta_cache(c1 String, c2 String, c3 int, c4 
double, c5 struct<imei:string, imsi:string>, c6 array<string>) stored by 
'carbondata'")
+    sql("create table cache_level(c1 String) stored by 'carbondata'")
+  }
+
+  test("validate column_meta_cache with only empty spaces - 
alter_column_meta_cache_01") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='    ')")
+    }
+  }
+
+  test("validate the property with characters in different cases - 
alter_column_meta_cache_02") {
+    sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('COLUMN_meta_CachE'='c2,c3')")
+    assert(isExpectedValueValid("default", "alter_column_meta_cache", 
"column_meta_cache", "c2,c3"))
+  }
+
+  test("validate column_meta_cache with intermediate empty string between 
columns - alter_column_meta_cache_03") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c2,  ,c3')")
+    }
+  }
+
+  test("validate column_meta_cache with combination of valid and invalid 
columns - alter_column_meta_cache_04") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c2,c10')")
+    }
+  }
+
+  test("validate column_meta_cache for dimensions and measures - 
alter_column_meta_cache_05") {
+    sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c3,c2,c4')")
+    assert(isExpectedValueValid("default", "alter_column_meta_cache", 
"column_meta_cache", "c2,c3,c4"))
+  }
+
+  test("validate for duplicate column names - alter_column_meta_cache_06") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c2,c2,c3')")
+    }
+  }
+
+  test("validate column_meta_cache for complex struct type - 
alter_column_meta_cache_07") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c5')")
+    }
+  }
+
+  test("validate column_meta_cache for complex array type - 
alter_column_meta_cache_08") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c5,c2')")
+    }
+  }
+
+  test("validate column_meta_cache with empty value - 
alter_column_meta_cache_09") {
+    sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='')")
+    assert(isExpectedValueValid("default", "alter_column_meta_cache", 
"column_meta_cache", ""))
+  }
+
+  test("validate describe formatted command to display column_meta_cache when 
column_meta_cache is set - alter_column_meta_cache_10") {
+    sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c2')")
+    val descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, true, "column_meta_cache")
+  }
+
+  test("validate unsetting of column_meta_cache when column_meta_cache is 
already set - alter_column_meta_cache_11") {
+    sql("Alter table alter_column_meta_cache SET 
TBLPROPERTIES('column_meta_cache'='c2,c3')")
+    var descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, true, "COLUMN_META_CACHE")
+    sql("Alter table alter_column_meta_cache UNSET 
TBLPROPERTIES('column_meta_cache')")
+    descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+  }
+
+  test("validate unsetting of column_meta_cache when column_meta_cache is not 
already set - alter_column_meta_cache_12") {
+    var descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+    sql("Alter table alter_column_meta_cache UNSET 
TBLPROPERTIES('column_meta_cache')")
+    descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+  }
+
+  test("validate cache_level with only empty spaces - ALTER_CACHE_LEVEL_01") {
+    intercept[RuntimeException] {
+      sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='    ')")
+    }
+  }
+
+  test("validate cache_level with invalid values - ALTER_CACHE_LEVEL_02") {
+    intercept[RuntimeException] {
+      sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='xyz,abc')")
+    }
+  }
+
+  test("validate cache_level with property in different cases - 
ALTER_CACHE_LEVEL_03") {
+    sql("Alter table cache_level SET TBLPROPERTIES('CACHE_leveL'='BLOcK')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", 
"BLOCK"))
+  }
+
+  test("validate cache_level with default value as Blocklet - 
ALTER_CACHE_LEVEL_04") {
+    sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", 
"BLOCKLET"))
+  }
+
+  test("validate describe formatted command to display cache_level when 
cache_level is set - ALTER_CACHE_LEVEL_05") {
+    sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')")
+    val descResult = sql("describe formatted cache_level")
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+  test("validate describe formatted command to display cache_level when 
cache_level is not set - ALTER_CACHE_LEVEL_06") {
+    sql("Alter table cache_level UNSET TBLPROPERTIES('cache_level')")
+    val descResult = sql("describe formatted cache_level")
+    // even though not configured default cache level will be displayed as 
BLOCK
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+  override def afterAll: Unit = {
+    // drop table
+    dropTable
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
new file mode 100644
index 0000000..8dadef9
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+/**
+ * test class for validating create table with column_meta_cache and 
cache_level properties
+ */
+class TestCreateTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest 
with BeforeAndAfterAll {
+
+  private def isExpectedValueValid(dbName: String,
+      tableName: String,
+      key: String,
+      expectedValue: String): Boolean = {
+    val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), 
tableName)(sqlContext.sparkSession)
+    val value = 
carbonTable.getTableInfo.getFactTable.getTableProperties.get(key)
+    expectedValue.equals(value)
+  }
+
+  test("validate column_meta_cache with only empty spaces - 
COLUMN_META_CACHE_01") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='    ')")
+    }
+  }
+
+  test("validate the property with characters in different cases - 
COLUMN_META_CACHE_02") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')")
+    assert(isExpectedValueValid("default", "column_meta_cache", 
"column_meta_cache", "c2"))
+  }
+
+  test("validate column_meta_cache with intermediate empty string between 
columns - COLUMN_META_CACHE_03") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,  ,c3')")
+    }
+  }
+
+  test("validate column_meta_cache with combination of valid and invalid 
columns - COLUMN_META_CACHE_04") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c10')")
+    }
+  }
+
+  test("validate column_meta_cache for dimensions and measures - 
COLUMN_META_CACHE_05") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c3,c2,c4')")
+    assert(isExpectedValueValid("default", "column_meta_cache", 
"column_meta_cache", "c2,c3,c4"))
+  }
+
+  test("validate for duplicate column names - COLUMN_META_CACHE_06") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c2,c3')")
+    }
+  }
+
+  test("validate column_meta_cache for complex struct type - 
COLUMN_META_CACHE_07") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double, c5 struct<imei:string, imsi:string>) stored by 'carbondata' 
TBLPROPERTIES('column_meta_cache'='c5')")
+    }
+  }
+
+  test("validate column_meta_cache for complex array type - 
COLUMN_META_CACHE_08") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double, c5 array<string>) stored by 'carbondata' 
TBLPROPERTIES('column_meta_cache'='c5,c2')")
+    }
+  }
+
+  test("validate column_meta_cache with empty value - COLUMN_META_CACHE_09") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='')")
+    assert(isExpectedValueValid("default", "column_meta_cache", 
"column_meta_cache", ""))
+  }
+
+  test("validate describe formatted command to display column_meta_cache when 
column_meta_cache is set - COLUMN_META_CACHE_10") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')")
+    val descResult = sql("describe formatted column_meta_cache")
+    checkExistence(descResult, true, "COLUMN_META_CACHE")
+  }
+
+  test("validate describe formatted command to display column_meta_cache when 
column_meta_cache is not set - COLUMN_META_CACHE_11") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 
double) stored by 'carbondata'")
+    val descResult = sql("describe formatted column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+  }
+
+  test("validate cache_level with only empty spaces - CACHE_LEVEL_01") {
+    sql("drop table if exists cache_level")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table cache_level(c1 String) stored by 'carbondata' 
TBLPROPERTIES('cache_level'='    ')")
+    }
+  }
+
+  test("validate cache_level with invalid values - CACHE_LEVEL_02") {
+    sql("drop table if exists cache_level")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table cache_level(c1 String) stored by 'carbondata' 
TBLPROPERTIES('cache_level'='xyz,abc')")
+    }
+  }
+
+  test("validate cache_level with property in different cases - 
CACHE_LEVEL_03") {
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata' 
TBLPROPERTIES('CACHE_leveL'='BLOcK')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", 
"BLOCK"))
+  }
+
+  test("validate cache_level with default value as Blocklet - CACHE_LEVEL_04") 
{
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata' 
TBLPROPERTIES('cache_level'='bloCKlet')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", 
"BLOCKLET"))
+  }
+
+  test("validate describe formatted command to display cache_level when 
cache_level is set - CACHE_LEVEL_05") {
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata' 
TBLPROPERTIES('cache_level'='bloCKlet')")
+    val descResult = sql("describe formatted cache_level")
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+  test("validate describe formatted command to display cache_level when 
cache_level is not set - CACHE_LEVEL_06") {
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata'")
+    val descResult = sql("describe formatted cache_level")
+    // even though not configured default cache level will be displayed as 
BLOCK
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
index 5598457..ceb0ac3 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
@@ -51,10 +51,10 @@ class TestDescribeTable extends QueryTest with 
BeforeAndAfterAll {
   test("test describe formatted table desc1") {
 
     val resultCol = Seq("", "", "##Detailed Column property", "##Detailed 
Table Information", "ADAPTIVE", "CARBON Store Path", "Comment", "Database 
Name", "Last Update Time",
-    "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Local 
Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table 
Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
+    "SORT_COLUMNS", "SORT_SCOPE", "CACHE_LEVEL", "Streaming", "Table Block 
Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data 
Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", 
"dec2col4")
     val resultRow: Seq[Row] = resultCol map(propName => Row(f"$propName%-36s"))
     checkAnswer(sql("desc formatted DESC1").select("col_name"), resultRow)
-    assert(sql("desc formatted desc1").count() == 22)
+    assert(sql("desc formatted desc1").count() == 23)
   }
 
   test("test describe formatted for partition table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 3995aa7..4723e6b 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -26,11 +26,11 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 import scala.util.Random
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.sql.{Row, RowFactory}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, 
PartitionerField}
 import org.apache.spark.sql.types.{MetadataBuilder, StringType}
@@ -896,4 +896,93 @@ object CommonUtil {
     CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
   }
 
+  /**
+   * This method will validate the cache level
+   *
+   * @param cacheLevel
+   * @param tableProperties
+   */
+  def validateCacheLevel(cacheLevel: String, tableProperties: Map[String, 
String]): Unit = {
+    val supportedCacheLevel = Seq("BLOCK", "BLOCKLET")
+    if (cacheLevel.trim.isEmpty) {
+      val errorMessage = "Invalid value: Empty column names for the option(s): 
" +
+                         CarbonCommonConstants.CACHE_LEVEL
+      throw new MalformedCarbonCommandException(errorMessage)
+    } else {
+      val trimmedCacheLevel = cacheLevel.trim.toUpperCase
+      if (!supportedCacheLevel.contains(trimmedCacheLevel)) {
+        val errorMessage = s"Invalid value: Allowed vaLues for ${
+          CarbonCommonConstants.CACHE_LEVEL} are BLOCK AND BLOCKLET"
+        throw new MalformedCarbonCommandException(errorMessage)
+      }
+      tableProperties.put(CarbonCommonConstants.CACHE_LEVEL, trimmedCacheLevel)
+    }
+  }
+
+  /**
+   * This will validate the column meta cache i.e the columns to be cached.
+   * By default all dimensions will be cached.
+   * If the property is already defined in create table DDL then validate it,
+   * else add all the dimension columns as columns to be cached to table 
properties.
+   * valid values for COLUMN_META_CACHE can either be empty or can have one or 
more comma
+   * separated values
+   */
+  def validateColumnMetaCacheFields(dbName: String,
+      tableName: String,
+      tableColumns: Seq[String],
+      cachedColumns: String,
+      tableProperties: Map[String, String]): Unit = {
+    val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+    // below check is added because empty value for column_meta_cache is 
allowed and in that
+    // case there should not be any validation
+    if (!cachedColumns.equals("")) {
+      validateColumnMetaCacheOption(tableIdentifier, dbName, cachedColumns, 
tableColumns)
+      val columnsToBeCached = cachedColumns.split(",").map(x => 
x.trim.toLowerCase).toSeq
+      // make the columns in create table order and then add it to table 
properties
+      val createOrder = tableColumns.filter(col => 
columnsToBeCached.contains(col))
+      tableProperties.put(CarbonCommonConstants.COLUMN_META_CACHE, 
createOrder.mkString(","))
+    }
+  }
+
+  /**
+   * Validate the column_meta_cache option in tableproperties
+   *
+   * @param tableIdentifier
+   * @param databaseName
+   * @param columnsToBeCached
+   * @param tableColumns
+   */
+  private def validateColumnMetaCacheOption(tableIdentifier: TableIdentifier,
+      databaseName: String,
+      columnsToBeCached: String,
+      tableColumns: Seq[String]): Unit = {
+    // check if only empty spaces are given in the property value
+    if (columnsToBeCached.trim.isEmpty) {
+      val errorMessage = "Invalid value: Empty column names for the option(s): 
" +
+                         CarbonCommonConstants.COLUMN_META_CACHE
+      throw new MalformedCarbonCommandException(errorMessage)
+    } else {
+      val columns: Array[String] = columnsToBeCached.split(',').map(x => 
x.toLowerCase.trim)
+      // Check for duplicate column names
+      columns.groupBy(col => col.toLowerCase).foreach(f => if (f._2.size > 1) {
+        throw new MalformedCarbonCommandException(s"Duplicate column name 
found : ${ f._1 }")
+      })
+      columns.foreach(col => {
+        // check if any intermediate column is empty
+        if (null == col || col.trim.isEmpty) {
+          val errorMessage = "Invalid value: Empty column names for the 
option(s): " +
+                             CarbonCommonConstants.COLUMN_META_CACHE
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+        // check if the column exists in the table
+        if (!tableColumns.contains(col.toLowerCase)) {
+          val errorMessage = s"Column $col does not exists in the table ${
+            databaseName
+          }.${ tableIdentifier.table }"
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+      })
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index e534f5f..13d1ff7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -381,7 +381,36 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, 
tableProperties)
     // get partitionInfo
     val partitionInfo = getPartitionInfo(partitionCols, tableProperties)
-
+    if 
(tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+      // validate the column_meta_cache option
+      val tableColumns = dims.map(x => x.name.get) ++ msrs.map(x => x.name.get)
+      CommonUtil.validateColumnMetaCacheFields(tableName,
+        dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+        tableColumns,
+        tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
+        tableProperties)
+      val columnsToBeCached = 
tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get
+      if (columnsToBeCached.nonEmpty) {
+        columnsToBeCached.split(",").foreach { column =>
+          val dimFieldToBeCached = dims.filter(x => x.name.get.equals(column))
+          // first element is taken as each column with have a unique name
+          // check for complex type column
+          if (dimFieldToBeCached.nonEmpty &&
+              
isComplexDimDictionaryExclude(dimFieldToBeCached(0).dataType.get)) {
+            val errorMessage =
+              s"$column is a complex type column and complex type is not 
allowed for " +
+              s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
+            throw new MalformedCarbonCommandException(errorMessage)
+          }
+        }
+      }
+    }
+    // validate the cache level
+    if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
+      CommonUtil.validateCacheLevel(
+        tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get,
+        tableProperties)
+    }
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
     // validate table level properties for compaction

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 3b56a35..23b5cba 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -108,6 +108,8 @@ private[sql] case class CarbonDescribeFormattedCommand(
     results ++= Seq(("SORT_SCOPE", tblProps.asScala.getOrElse("sort_scope", 
CarbonCommonConstants
       .LOAD_SORT_SCOPE_DEFAULT), tblProps.asScala.getOrElse("sort_scope", 
CarbonCommonConstants
       .LOAD_SORT_SCOPE_DEFAULT)))
+    // add Cache Level property
+    results ++= Seq(("CACHE_LEVEL", tblProps.getOrDefault("CACHE_LEVEL", 
"BLOCK"), ""))
     val isStreaming = tblProps.asScala.getOrElse("streaming", "false")
     results ++= Seq(("Streaming", isStreaming, ""))
     val isLocalDictEnabled = tblProps.asScala
@@ -187,6 +189,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
     results ++= Seq(("SORT_COLUMNS", 
relation.metaData.carbonTable.getSortColumns(
       relation.carbonTable.getTableName).asScala
       .map(column => column).mkString(","), ""))
+    // add columns configured in column meta cache
+    if (null != tblProps.get(CarbonCommonConstants.COLUMN_META_CACHE)) {
+      results ++=
+      Seq(("COLUMN_META_CACHE", 
carbonTable.getCachedColumns(carbonTable.getTableName).asScala
+        .map(col => col).mkString(","), ""))
+    }
     if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
       results ++=
       Seq(("#Partition Information", "", ""),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index c291ae2..a6a730b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util
 
+import java.util
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
@@ -29,14 +31,18 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import 
org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, 
ColumnSchema}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.CommonUtil
 
 object AlterTableUtil {
 
@@ -312,16 +318,22 @@ object AlterTableUtil {
     (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
     val tableName = tableIdentifier.table
     val dbName = 
tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table properties request has been received for 
$dbName.$tableName")
+    LOGGER.audit(s"Alter table newProperties request has been received for 
$dbName.$tableName")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, 
LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     val timeStamp = 0L
-    var carbonTable: CarbonTable = null
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, 
locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
+      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
+      val lowerCasePropertiesMap: mutable.Map[String, String] = 
mutable.Map.empty
+      // convert all the keys to lower case
+      properties.foreach { entry =>
+        lowerCasePropertiesMap.put(entry._1.toLowerCase, entry._2)
+      }
+      // validate the required cache level properties
+      validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap)
       // get the latest carbon table
       // read the latest schema file
       val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)
@@ -337,16 +349,19 @@ object AlterTableUtil {
         wrapperTableInfo, dbName, tableName)
       val tblPropertiesMap: mutable.Map[String, String] =
         thriftTable.fact_table.getTableProperties.asScala
+      // below map will be used for cache invalidation. As tblProperties map 
is getting modified
+      // in the next few steps the original map need to be retained for any 
decision making
+      val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
       if (set) {
-        //       This overrides old properties and update the comment 
parameter of thriftTable
-        //       with the newly added/modified comment since thriftTable also 
holds comment as its
-        //       direct property.
-        properties.foreach { property => if 
(validateTableProperties(property._1)) {
-          tblPropertiesMap.put(property._1.toLowerCase, property._2)
-        } else { val errorMessage = "Error: Invalid option(s): " + 
property._1.toString()
+        // This overrides old newProperties and update the comment parameter 
of thriftTable
+        // with the newly added/modified comment since thriftTable also holds 
comment as its
+        // direct property.
+        lowerCasePropertiesMap.foreach { property => if 
(validateTableProperties(property._1)) {
+          tblPropertiesMap.put(property._1, property._2)
+        } else {
+          val errorMessage = "Error: Invalid option(s): " + 
property._1.toString()
           throw new MalformedCarbonCommandException(errorMessage)
-        }
-        }
+        }}
       } else {
         // This removes the comment parameter from thriftTable
         // since thriftTable also holds comment as its property.
@@ -364,20 +379,232 @@ object AlterTableUtil {
         thriftTable)(sparkSession)
       catalog.alterTable(tableIdentifier, schemParts, cols)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
-      LOGGER.info(s"Alter table properties is successful for table 
$dbName.$tableName")
-      LOGGER.audit(s"Alter table properties is successful for table 
$dbName.$tableName")
+      // check and clear the block/blocklet cache
+      checkAndClearBlockletCache(carbonTable,
+        existingTablePropertiesMap,
+        lowerCasePropertiesMap,
+        propKeys,
+        set)
+      LOGGER.info(s"Alter table newProperties is successful for table 
$dbName.$tableName")
+      LOGGER.audit(s"Alter table newProperties is successful for table 
$dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error(e, "Alter table properties failed")
-        sys.error(s"Alter table properties operation failed: ${e.getMessage}")
+        LOGGER.error(e, "Alter table newProperties failed")
+        sys.error(s"Alter table newProperties operation failed: 
${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks)
     }
   }
 
-  def validateTableProperties(propKey: String): Boolean = {
-    val supportedOptions = Seq("STREAMING", "COMMENT")
-   supportedOptions.contains(propKey.toUpperCase)
+  private def validateTableProperties(propKey: String): Boolean = {
+    val supportedOptions = Seq("STREAMING", "COMMENT", "COLUMN_META_CACHE", 
"CACHE_LEVEL")
+    supportedOptions.contains(propKey.toUpperCase)
+  }
+
+  /**
+   * validate column meta cache and cache level properties if configured by 
the user
+   *
+   * @param carbonTable
+   * @param propertiesMap
+   */
+  private def validateColumnMetaCacheAndCacheLevel(carbonTable: CarbonTable,
+      propertiesMap: mutable.Map[String, String]): Unit = {
+    // validate column meta cache property
+    if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+      // Column meta cache is not allowed for child tables and dataMaps
+      if (carbonTable.isChildDataMap) {
+        throw new MalformedCarbonCommandException(s"Table property ${
+          CarbonCommonConstants.COLUMN_META_CACHE} is not allowed for child 
datamaps")
+      }
+      val schemaList: util.List[ColumnSchema] = CarbonUtil
+        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName),
+          carbonTable.getMeasureByTableName(carbonTable.getTableName))
+      val tableColumns: Seq[String] = schemaList.asScala
+        .map(columnSchema => columnSchema.getColumnName)
+      CommonUtil
+        .validateColumnMetaCacheFields(carbonTable.getDatabaseName,
+          carbonTable.getTableName,
+          tableColumns,
+          propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
+          propertiesMap)
+      val columnsToBeCached = 
propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get
+      validateForComplexTypeColumn(carbonTable, columnsToBeCached)
+    }
+    // validate cache level property
+    if (propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
+      // Cache level is not allowed for child tables and dataMaps
+      if (carbonTable.isChildDataMap) {
+        throw new MalformedCarbonCommandException(s"Table property ${
+          CarbonCommonConstants.CACHE_LEVEL} is not allowed for child 
datamaps")
+      }
+      CommonUtil.validateCacheLevel(
+        propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).get,
+        propertiesMap)
+    }
+  }
+
+  /**
+   * This method will validate if there is any complex type column in the 
columns to be cached
+   *
+   * @param carbonTable
+   * @param cachedColumns
+   */
+  private def validateForComplexTypeColumn(carbonTable: CarbonTable,
+      cachedColumns: String): Unit = {
+    if (cachedColumns.nonEmpty) {
+      cachedColumns.split(",").foreach { column =>
+        val dimension = 
carbonTable.getDimensionByName(carbonTable.getTableName, column)
+        if (null != dimension && dimension.isComplex) {
+          val errorMessage =
+            s"$column is a complex type column and complex type is not allowed 
for " +
+            s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will check and clear the driver block/blocklet cache
+   *
+   * @param carbonTable
+   * @param existingTableProperties
+   * @param newProperties
+   * @param propKeys
+   * @param set
+   */
+  private def checkAndClearBlockletCache(carbonTable: CarbonTable,
+      existingTableProperties: scala.collection.mutable.Map[String, String],
+      newProperties: mutable.Map[String, String],
+      propKeys: Seq[String],
+      set: Boolean): Unit = {
+    if (set) {
+      clearBlockletCacheForCachingProperties(carbonTable, 
existingTableProperties, newProperties)
+    } else {
+      // convert all the unset keys to lower case
+      val propertiesToBeRemoved = propKeys.map(key => key.toLowerCase)
+      // This is unset scenario and the cache needs to be cleaned only when
+      // 1. column_meta_cache property is getting unset and existing table 
properties contains
+      // this property
+      // 2. cache_level property is being unset and existing table properties 
contains this property
+      // and the existing value is not equal to default value because after 
un-setting the property
+      // the cache should be loaded again with default value
+      if 
(propertiesToBeRemoved.contains(CarbonCommonConstants.COLUMN_META_CACHE) &&
+          
existingTableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) 
{
+        ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+      } else if 
(propertiesToBeRemoved.contains(CarbonCommonConstants.CACHE_LEVEL)) {
+        val cacheLevel = 
existingTableProperties.get(CarbonCommonConstants.CACHE_LEVEL)
+        if (cacheLevel.isDefined &&
+            
!cacheLevel.equals(CarbonCommonConstants.CACHE_LEVEL_DEFAULT_VALUE)) {
+          ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will validate the column_meta_cache and cache_level 
properties and clear the
+   * driver block/blocklet cache
+   *
+   * @param carbonTable
+   * @param tblPropertiesMap
+   * @param newProperties
+   */
+  private def clearBlockletCacheForCachingProperties(
+      carbonTable: CarbonTable,
+      tblPropertiesMap: scala.collection.mutable.Map[String, String],
+      newProperties: mutable.Map[String, String]): Unit = {
+    // check if column meta cache is defined. if defined then validate and 
clear the BTree cache
+    // if required
+    val columnMetaCacheProperty = 
newProperties.get(CarbonCommonConstants.COLUMN_META_CACHE)
+    columnMetaCacheProperty match {
+      case Some(newColumnsToBeCached) =>
+        if (!checkIfColumnsAreAlreadyCached(carbonTable, tblPropertiesMap
+          .get(CarbonCommonConstants.COLUMN_META_CACHE), 
newColumnsToBeCached)) {
+          ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+        }
+      case None =>
+      // don't do anything
+    }
+    // check if column meta cache is defined. if defined then validate and 
clear the BTree cache
+    // if required
+    val cacheLevelProperty = 
newProperties.get(CarbonCommonConstants.CACHE_LEVEL)
+    cacheLevelProperty match {
+      case Some(newCacheLevelValue) =>
+        if 
(!isCacheLevelValid(tblPropertiesMap.get(CarbonCommonConstants.CACHE_LEVEL),
+          newCacheLevelValue)) {
+          ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+        }
+      case None =>
+      // don't do anything
+    }
+  }
+
+  /**
+   * Method to verify if the existing cache level is same as the new cache 
level
+   *
+   * @param existingCacheLevelValue
+   * @param newCacheLevelValue
+   * @return
+   */
+  private def isCacheLevelValid(existingCacheLevelValue: Option[String],
+      newCacheLevelValue: String): Boolean = {
+    existingCacheLevelValue match {
+      case Some(existingValue) =>
+        existingValue.equals(newCacheLevelValue)
+      case None =>
+        false
+    }
+  }
+
+  /**
+   * Check the new columns to be cached with the already cached columns. If 
count of new columns
+   * and already cached columns is same and all the new columns are already 
cached then
+   * false will be returned else true
+   *
+   * @param carbonTable
+   * @param existingCacheColumns
+   * @param newColumnsToBeCached
+   * @return
+   */
+  private def checkIfColumnsAreAlreadyCached(
+      carbonTable: CarbonTable,
+      existingCacheColumns: Option[String],
+      newColumnsToBeCached: String): Boolean = {
+    val newColumns = newColumnsToBeCached.split(",").map(x => 
x.trim.toLowerCase)
+    val isCached = existingCacheColumns match {
+      case Some(value) =>
+        val existingProperty = value.split(",").map(x => x.trim.toLowerCase)
+        compareColumns(existingProperty, newColumns)
+      case None =>
+        // By default all the columns in the table will be cached. This case 
is to compare all the
+        // table columns already cached to the newly specified cached columns
+        val schemaList: util.List[ColumnSchema] = CarbonUtil
+          
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName))
+        val tableColumns: Array[String] = schemaList.asScala
+          .map(columnSchema => columnSchema.getColumnName).toArray
+        compareColumns(tableColumns, newColumns)
+    }
+    isCached
   }
+
+  /**
+   * compare the existing cache columns and the new columns to be cached
+   *
+   * @param existingCachedColumns
+   * @param newColumnsToBeCached
+   * @return
+   */
+  private def compareColumns(existingCachedColumns: Array[String],
+      newColumnsToBeCached: Array[String]): Boolean = {
+    val allColumnsMatch = if (existingCachedColumns.length == 
newColumnsToBeCached.length) {
+      existingCachedColumns.filter(col => 
!newColumnsToBeCached.contains(col)).length == 0
+    } else {
+      false
+    }
+    allColumnsMatch
+  }
+
 }

Reply via email to