Repository: carbondata Updated Branches: refs/heads/master e7103397d -> a0350e100
[CARBONDATA-2647] [CARBONDATA-2648] Add support for COLUMN_META_CACHE and CACHE_LEVEL in create table and alter table properties Things done as part of this PR Support for configuring COLUMN_META_CACHE in create and alter table set properties DDL. Support for configuring CACHE_LEVEL in create and alter table set properties DDL. Describe formatted display support for COLUMN_META_CACHE and CACHE_LEVEL Any interfaces changed? Create Table Syntax CREATE TABLE [dbName].tableName (col1 String, col2 String, col3 int,â¦) STORED BY âcarbondataâ TBLPROPERTIES (âCOLUMN_META_CACHEâ=âcol1,col2,â¦â, 'CACHE_LEVEL'='BLOCKLET') Alter Table set properties Syntax ALTER TABLE [dbName].tableName SET TBLPROPERTIES (âCOLUMN_META_CACHEâ=âcol1,col2,â¦â, 'CACHE_LEVEL'='BLOCKLET') This closs #2418 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a0350e10 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a0350e10 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a0350e10 Branch: refs/heads/master Commit: a0350e100e262fef85409b3b4fe2a488d688f706 Parents: e710339 Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Tue Jun 26 20:11:14 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 28 16:37:43 2018 +0530 ---------------------------------------------------------------------- .../dictionary/ManageDictionaryAndBTree.java | 18 ++ .../core/constants/CarbonCommonConstants.java | 12 + .../core/metadata/schema/table/CarbonTable.java | 38 +++ ...ithColumnMetCacheAndCacheLevelProperty.scala | 168 ++++++++++++ ...ithColumnMetCacheAndCacheLevelProperty.scala | 155 +++++++++++ .../describeTable/TestDescribeTable.scala | 4 +- .../carbondata/spark/util/CommonUtil.scala | 91 ++++++- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 31 ++- .../table/CarbonDescribeFormattedCommand.scala | 8 + .../org/apache/spark/util/AlterTableUtil.scala | 263 +++++++++++++++++-- 10 files changed, 766 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java index a7d6027..b54fb14 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java @@ -149,4 +149,22 @@ public class ManageDictionaryAndBTree { } } + /** + * This method will remove the BTree instances from LRU cache for all the segments + * + * @param carbonTable + */ + public static void invalidateBTreeCache(CarbonTable carbonTable) { + LoadMetadataDetails[] loadMetadataDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + if (loadMetadataDetails.length > 0) { + String[] segments = new String[loadMetadataDetails.length]; + int loadCounter = 0; + for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) { + segments[loadCounter++] = loadMetadataDetail.getLoadName(); + } + invalidateBTreeCache(carbonTable.getAbsoluteTableIdentifier(), segments); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 118ff28..da40862 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1839,6 +1839,18 @@ public final class CarbonCommonConstants { * the node minimum load data default value */ public static final int CARBON_LOAD_MIN_SIZE_DEFAULT = 256; + /** + * property to be specified for caching min/max of required columns + */ + public static final String COLUMN_META_CACHE = "column_meta_cache"; + /** + * property to be specified for caching level (Block/Blocket) + */ + public static final String CACHE_LEVEL = "cache_level"; + /** + * default value for cache level + */ + public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK"; private CarbonCommonConstants() { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 68bd749..710c7af 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -1165,4 +1165,42 @@ public class CarbonTable implements Serializable { table.setLocalDictionaryEnabled(Boolean.parseBoolean("false")); } } + + /** + * Method to get the list of cached columns of the table + * + * @param tableName + * @return + */ + public List<String> getCachedColumns(String tableName) { + List<String> cachedColsList = new ArrayList<>(tableDimensionsMap.size()); + String cacheColumns = + tableInfo.getFactTable().getTableProperties().get(CarbonCommonConstants.COLUMN_META_CACHE); + if (null != cacheColumns && !cacheColumns.isEmpty()) { + List<CarbonDimension> carbonDimensions = tableDimensionsMap.get(tableName); + List<CarbonMeasure> carbonMeasures = tableMeasuresMap.get(tableName); + String[] cachedCols = cacheColumns.split(","); + for (String column : cachedCols) { + boolean found = false; + // this will avoid adding the columns which have been dropped from the table + for (CarbonDimension dimension : carbonDimensions) { + if (dimension.getColName().equals(column)) { + cachedColsList.add(column); + found = true; + break; + } + } + // if column is not a dimension then check in measures + if (!found) { + for (CarbonMeasure measure : carbonMeasures) { + if (measure.getColName().equals(column)) { + cachedColsList.add(column); + break; + } + } + } + } + } + return cachedColsList; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala new file mode 100644 index 0000000..dbe9c75 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.alterTable + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + +/** + * test class for validating alter table set properties with alter_column_meta_cache and + * cache_level properties + */ +class TestAlterTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll { + + private def isExpectedValueValid(dbName: String, + tableName: String, + key: String, + expectedValue: String): Boolean = { + val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession) + val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key) + expectedValue.equals(value) + } + + private def dropTable = { + sql("drop table if exists alter_column_meta_cache") + sql("drop table if exists cache_level") + } + + override def beforeAll { + // drop table + dropTable + // create table + sql("create table alter_column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 struct<imei:string, imsi:string>, c6 array<string>) stored by 'carbondata'") + sql("create table cache_level(c1 String) stored by 'carbondata'") + } + + test("validate column_meta_cache with only empty spaces - alter_column_meta_cache_01") { + intercept[RuntimeException] { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'=' ')") + } + } + + test("validate the property with characters in different cases - alter_column_meta_cache_02") { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('COLUMN_meta_CachE'='c2,c3')") + assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", "c2,c3")) + } + + test("validate column_meta_cache with intermediate empty string between columns - alter_column_meta_cache_03") { + intercept[RuntimeException] { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2, ,c3')") + } + } + + test("validate column_meta_cache with combination of valid and invalid columns - alter_column_meta_cache_04") { + intercept[RuntimeException] { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c10')") + } + } + + test("validate column_meta_cache for dimensions and measures - alter_column_meta_cache_05") { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c3,c2,c4')") + assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", "c2,c3,c4")) + } + + test("validate for duplicate column names - alter_column_meta_cache_06") { + intercept[RuntimeException] { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c2,c3')") + } + } + + test("validate column_meta_cache for complex struct type - alter_column_meta_cache_07") { + intercept[RuntimeException] { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5')") + } + } + + test("validate column_meta_cache for complex array type - alter_column_meta_cache_08") { + intercept[RuntimeException] { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5,c2')") + } + } + + test("validate column_meta_cache with empty value - alter_column_meta_cache_09") { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='')") + assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", "")) + } + + test("validate describe formatted command to display column_meta_cache when column_meta_cache is set - alter_column_meta_cache_10") { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2')") + val descResult = sql("describe formatted alter_column_meta_cache") + checkExistence(descResult, true, "column_meta_cache") + } + + test("validate unsetting of column_meta_cache when column_meta_cache is already set - alter_column_meta_cache_11") { + sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c3')") + var descResult = sql("describe formatted alter_column_meta_cache") + checkExistence(descResult, true, "COLUMN_META_CACHE") + sql("Alter table alter_column_meta_cache UNSET TBLPROPERTIES('column_meta_cache')") + descResult = sql("describe formatted alter_column_meta_cache") + checkExistence(descResult, false, "COLUMN_META_CACHE") + } + + test("validate unsetting of column_meta_cache when column_meta_cache is not already set - alter_column_meta_cache_12") { + var descResult = sql("describe formatted alter_column_meta_cache") + checkExistence(descResult, false, "COLUMN_META_CACHE") + sql("Alter table alter_column_meta_cache UNSET TBLPROPERTIES('column_meta_cache')") + descResult = sql("describe formatted alter_column_meta_cache") + checkExistence(descResult, false, "COLUMN_META_CACHE") + } + + test("validate cache_level with only empty spaces - ALTER_CACHE_LEVEL_01") { + intercept[RuntimeException] { + sql("Alter table cache_level SET TBLPROPERTIES('cache_level'=' ')") + } + } + + test("validate cache_level with invalid values - ALTER_CACHE_LEVEL_02") { + intercept[RuntimeException] { + sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='xyz,abc')") + } + } + + test("validate cache_level with property in different cases - ALTER_CACHE_LEVEL_03") { + sql("Alter table cache_level SET TBLPROPERTIES('CACHE_leveL'='BLOcK')") + assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCK")) + } + + test("validate cache_level with default value as Blocklet - ALTER_CACHE_LEVEL_04") { + sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')") + assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCKLET")) + } + + test("validate describe formatted command to display cache_level when cache_level is set - ALTER_CACHE_LEVEL_05") { + sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')") + val descResult = sql("describe formatted cache_level") + checkExistence(descResult, true, "CACHE_LEVEL") + } + + test("validate describe formatted command to display cache_level when cache_level is not set - ALTER_CACHE_LEVEL_06") { + sql("Alter table cache_level UNSET TBLPROPERTIES('cache_level')") + val descResult = sql("describe formatted cache_level") + // even though not configured default cache level will be displayed as BLOCK + checkExistence(descResult, true, "CACHE_LEVEL") + } + + override def afterAll: Unit = { + // drop table + dropTable + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala new file mode 100644 index 0000000..8dadef9 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + +/** + * test class for validating create table with column_meta_cache and cache_level properties + */ +class TestCreateTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll { + + private def isExpectedValueValid(dbName: String, + tableName: String, + key: String, + expectedValue: String): Boolean = { + val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession) + val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key) + expectedValue.equals(value) + } + + test("validate column_meta_cache with only empty spaces - COLUMN_META_CACHE_01") { + sql("drop table if exists column_meta_cache") + intercept[MalformedCarbonCommandException] { + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'=' ')") + } + } + + test("validate the property with characters in different cases - COLUMN_META_CACHE_02") { + sql("drop table if exists column_meta_cache") + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')") + assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c2")) + } + + test("validate column_meta_cache with intermediate empty string between columns - COLUMN_META_CACHE_03") { + sql("drop table if exists column_meta_cache") + intercept[MalformedCarbonCommandException] { + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2, ,c3')") + } + } + + test("validate column_meta_cache with combination of valid and invalid columns - COLUMN_META_CACHE_04") { + sql("drop table if exists column_meta_cache") + intercept[MalformedCarbonCommandException] { + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c10')") + } + } + + test("validate column_meta_cache for dimensions and measures - COLUMN_META_CACHE_05") { + sql("drop table if exists column_meta_cache") + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c3,c2,c4')") + assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c2,c3,c4")) + } + + test("validate for duplicate column names - COLUMN_META_CACHE_06") { + sql("drop table if exists column_meta_cache") + intercept[MalformedCarbonCommandException] { + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c2,c3')") + } + } + + test("validate column_meta_cache for complex struct type - COLUMN_META_CACHE_07") { + sql("drop table if exists column_meta_cache") + intercept[MalformedCarbonCommandException] { + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 struct<imei:string, imsi:string>) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c5')") + } + } + + test("validate column_meta_cache for complex array type - COLUMN_META_CACHE_08") { + sql("drop table if exists column_meta_cache") + intercept[MalformedCarbonCommandException] { + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 array<string>) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c5,c2')") + } + } + + test("validate column_meta_cache with empty value - COLUMN_META_CACHE_09") { + sql("drop table if exists column_meta_cache") + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='')") + assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "")) + } + + test("validate describe formatted command to display column_meta_cache when column_meta_cache is set - COLUMN_META_CACHE_10") { + sql("drop table if exists column_meta_cache") + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')") + val descResult = sql("describe formatted column_meta_cache") + checkExistence(descResult, true, "COLUMN_META_CACHE") + } + + test("validate describe formatted command to display column_meta_cache when column_meta_cache is not set - COLUMN_META_CACHE_11") { + sql("drop table if exists column_meta_cache") + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata'") + val descResult = sql("describe formatted column_meta_cache") + checkExistence(descResult, false, "COLUMN_META_CACHE") + } + + test("validate cache_level with only empty spaces - CACHE_LEVEL_01") { + sql("drop table if exists cache_level") + intercept[MalformedCarbonCommandException] { + sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'=' ')") + } + } + + test("validate cache_level with invalid values - CACHE_LEVEL_02") { + sql("drop table if exists cache_level") + intercept[MalformedCarbonCommandException] { + sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='xyz,abc')") + } + } + + test("validate cache_level with property in different cases - CACHE_LEVEL_03") { + sql("drop table if exists cache_level") + sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('CACHE_leveL'='BLOcK')") + assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCK")) + } + + test("validate cache_level with default value as Blocklet - CACHE_LEVEL_04") { + sql("drop table if exists cache_level") + sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='bloCKlet')") + assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCKLET")) + } + + test("validate describe formatted command to display cache_level when cache_level is set - CACHE_LEVEL_05") { + sql("drop table if exists cache_level") + sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='bloCKlet')") + val descResult = sql("describe formatted cache_level") + checkExistence(descResult, true, "CACHE_LEVEL") + } + + test("validate describe formatted command to display cache_level when cache_level is not set - CACHE_LEVEL_06") { + sql("drop table if exists cache_level") + sql("create table cache_level(c1 String) stored by 'carbondata'") + val descResult = sql("describe formatted cache_level") + // even though not configured default cache level will be displayed as BLOCK + checkExistence(descResult, true, "CACHE_LEVEL") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala index 5598457..ceb0ac3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala @@ -51,10 +51,10 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll { test("test describe formatted table desc1") { val resultCol = Seq("", "", "##Detailed Column property", "##Detailed Table Information", "ADAPTIVE", "CARBON Store Path", "Comment", "Database Name", "Last Update Time", - "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4") + "SORT_COLUMNS", "SORT_SCOPE", "CACHE_LEVEL", "Streaming", "Table Block Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4") val resultRow: Seq[Row] = resultCol map(propName => Row(f"$propName%-36s")) checkAnswer(sql("desc formatted DESC1").select("col_name"), resultRow) - assert(sql("desc formatted desc1").count() == 22) + assert(sql("desc formatted desc1").count() == 23) } test("test describe formatted for partition table") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 3995aa7..4723e6b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -26,11 +26,11 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.Map import scala.util.Random -import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.sql.{Row, RowFactory} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} import org.apache.spark.sql.types.{MetadataBuilder, StringType} @@ -896,4 +896,93 @@ object CommonUtil { CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) } + /** + * This method will validate the cache level + * + * @param cacheLevel + * @param tableProperties + */ + def validateCacheLevel(cacheLevel: String, tableProperties: Map[String, String]): Unit = { + val supportedCacheLevel = Seq("BLOCK", "BLOCKLET") + if (cacheLevel.trim.isEmpty) { + val errorMessage = "Invalid value: Empty column names for the option(s): " + + CarbonCommonConstants.CACHE_LEVEL + throw new MalformedCarbonCommandException(errorMessage) + } else { + val trimmedCacheLevel = cacheLevel.trim.toUpperCase + if (!supportedCacheLevel.contains(trimmedCacheLevel)) { + val errorMessage = s"Invalid value: Allowed vaLues for ${ + CarbonCommonConstants.CACHE_LEVEL} are BLOCK AND BLOCKLET" + throw new MalformedCarbonCommandException(errorMessage) + } + tableProperties.put(CarbonCommonConstants.CACHE_LEVEL, trimmedCacheLevel) + } + } + + /** + * This will validate the column meta cache i.e the columns to be cached. + * By default all dimensions will be cached. + * If the property is already defined in create table DDL then validate it, + * else add all the dimension columns as columns to be cached to table properties. + * valid values for COLUMN_META_CACHE can either be empty or can have one or more comma + * separated values + */ + def validateColumnMetaCacheFields(dbName: String, + tableName: String, + tableColumns: Seq[String], + cachedColumns: String, + tableProperties: Map[String, String]): Unit = { + val tableIdentifier = TableIdentifier(tableName, Some(dbName)) + // below check is added because empty value for column_meta_cache is allowed and in that + // case there should not be any validation + if (!cachedColumns.equals("")) { + validateColumnMetaCacheOption(tableIdentifier, dbName, cachedColumns, tableColumns) + val columnsToBeCached = cachedColumns.split(",").map(x => x.trim.toLowerCase).toSeq + // make the columns in create table order and then add it to table properties + val createOrder = tableColumns.filter(col => columnsToBeCached.contains(col)) + tableProperties.put(CarbonCommonConstants.COLUMN_META_CACHE, createOrder.mkString(",")) + } + } + + /** + * Validate the column_meta_cache option in tableproperties + * + * @param tableIdentifier + * @param databaseName + * @param columnsToBeCached + * @param tableColumns + */ + private def validateColumnMetaCacheOption(tableIdentifier: TableIdentifier, + databaseName: String, + columnsToBeCached: String, + tableColumns: Seq[String]): Unit = { + // check if only empty spaces are given in the property value + if (columnsToBeCached.trim.isEmpty) { + val errorMessage = "Invalid value: Empty column names for the option(s): " + + CarbonCommonConstants.COLUMN_META_CACHE + throw new MalformedCarbonCommandException(errorMessage) + } else { + val columns: Array[String] = columnsToBeCached.split(',').map(x => x.toLowerCase.trim) + // Check for duplicate column names + columns.groupBy(col => col.toLowerCase).foreach(f => if (f._2.size > 1) { + throw new MalformedCarbonCommandException(s"Duplicate column name found : ${ f._1 }") + }) + columns.foreach(col => { + // check if any intermediate column is empty + if (null == col || col.trim.isEmpty) { + val errorMessage = "Invalid value: Empty column names for the option(s): " + + CarbonCommonConstants.COLUMN_META_CACHE + throw new MalformedCarbonCommandException(errorMessage) + } + // check if the column exists in the table + if (!tableColumns.contains(col.toLowerCase)) { + val errorMessage = s"Column $col does not exists in the table ${ + databaseName + }.${ tableIdentifier.table }" + throw new MalformedCarbonCommandException(errorMessage) + } + }) + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index e534f5f..13d1ff7 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -381,7 +381,36 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties) // get partitionInfo val partitionInfo = getPartitionInfo(partitionCols, tableProperties) - + if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) { + // validate the column_meta_cache option + val tableColumns = dims.map(x => x.name.get) ++ msrs.map(x => x.name.get) + CommonUtil.validateColumnMetaCacheFields(tableName, + dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME), + tableColumns, + tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get, + tableProperties) + val columnsToBeCached = tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get + if (columnsToBeCached.nonEmpty) { + columnsToBeCached.split(",").foreach { column => + val dimFieldToBeCached = dims.filter(x => x.name.get.equals(column)) + // first element is taken as each column with have a unique name + // check for complex type column + if (dimFieldToBeCached.nonEmpty && + isComplexDimDictionaryExclude(dimFieldToBeCached(0).dataType.get)) { + val errorMessage = + s"$column is a complex type column and complex type is not allowed for " + + s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }" + throw new MalformedCarbonCommandException(errorMessage) + } + } + } + } + // validate the cache level + if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) { + CommonUtil.validateCacheLevel( + tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get, + tableProperties) + } // validate the tableBlockSize from table properties CommonUtil.validateTableBlockSize(tableProperties) // validate table level properties for compaction http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index 3b56a35..23b5cba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -108,6 +108,8 @@ private[sql] case class CarbonDescribeFormattedCommand( results ++= Seq(("SORT_SCOPE", tblProps.asScala.getOrElse("sort_scope", CarbonCommonConstants .LOAD_SORT_SCOPE_DEFAULT), tblProps.asScala.getOrElse("sort_scope", CarbonCommonConstants .LOAD_SORT_SCOPE_DEFAULT))) + // add Cache Level property + results ++= Seq(("CACHE_LEVEL", tblProps.getOrDefault("CACHE_LEVEL", "BLOCK"), "")) val isStreaming = tblProps.asScala.getOrElse("streaming", "false") results ++= Seq(("Streaming", isStreaming, "")) val isLocalDictEnabled = tblProps.asScala @@ -187,6 +189,12 @@ private[sql] case class CarbonDescribeFormattedCommand( results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns( relation.carbonTable.getTableName).asScala .map(column => column).mkString(","), "")) + // add columns configured in column meta cache + if (null != tblProps.get(CarbonCommonConstants.COLUMN_META_CACHE)) { + results ++= + Seq(("COLUMN_META_CACHE", carbonTable.getCachedColumns(carbonTable.getTableName).asScala + .map(col => col).mkString(","), "")) + } if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { results ++= Seq(("#Partition Information", "", ""), http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index c291ae2..a6a730b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -29,14 +31,18 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.spark.util.CommonUtil object AlterTableUtil { @@ -312,16 +318,22 @@ object AlterTableUtil { (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = { val tableName = tableIdentifier.table val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName") + LOGGER.audit(s"Alter table newProperties request has been received for $dbName.$tableName") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) var locks = List.empty[ICarbonLock] val timeStamp = 0L - var carbonTable: CarbonTable = null try { locks = AlterTableUtil .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + val lowerCasePropertiesMap: mutable.Map[String, String] = mutable.Map.empty + // convert all the keys to lower case + properties.foreach { entry => + lowerCasePropertiesMap.put(entry._1.toLowerCase, entry._2) + } + // validate the required cache level properties + validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap) // get the latest carbon table // read the latest schema file val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable) @@ -337,16 +349,19 @@ object AlterTableUtil { wrapperTableInfo, dbName, tableName) val tblPropertiesMap: mutable.Map[String, String] = thriftTable.fact_table.getTableProperties.asScala + // below map will be used for cache invalidation. As tblProperties map is getting modified + // in the next few steps the original map need to be retained for any decision making + val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*) if (set) { - // This overrides old properties and update the comment parameter of thriftTable - // with the newly added/modified comment since thriftTable also holds comment as its - // direct property. - properties.foreach { property => if (validateTableProperties(property._1)) { - tblPropertiesMap.put(property._1.toLowerCase, property._2) - } else { val errorMessage = "Error: Invalid option(s): " + property._1.toString() + // This overrides old newProperties and update the comment parameter of thriftTable + // with the newly added/modified comment since thriftTable also holds comment as its + // direct property. + lowerCasePropertiesMap.foreach { property => if (validateTableProperties(property._1)) { + tblPropertiesMap.put(property._1, property._2) + } else { + val errorMessage = "Error: Invalid option(s): " + property._1.toString() throw new MalformedCarbonCommandException(errorMessage) - } - } + }} } else { // This removes the comment parameter from thriftTable // since thriftTable also holds comment as its property. @@ -364,20 +379,232 @@ object AlterTableUtil { thriftTable)(sparkSession) catalog.alterTable(tableIdentifier, schemParts, cols) sparkSession.catalog.refreshTable(tableIdentifier.quotedString) - LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName") + // check and clear the block/blocklet cache + checkAndClearBlockletCache(carbonTable, + existingTablePropertiesMap, + lowerCasePropertiesMap, + propKeys, + set) + LOGGER.info(s"Alter table newProperties is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table newProperties is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error(e, "Alter table properties failed") - sys.error(s"Alter table properties operation failed: ${e.getMessage}") + LOGGER.error(e, "Alter table newProperties failed") + sys.error(s"Alter table newProperties operation failed: ${e.getMessage}") } finally { // release lock after command execution completion AlterTableUtil.releaseLocks(locks) } } - def validateTableProperties(propKey: String): Boolean = { - val supportedOptions = Seq("STREAMING", "COMMENT") - supportedOptions.contains(propKey.toUpperCase) + private def validateTableProperties(propKey: String): Boolean = { + val supportedOptions = Seq("STREAMING", "COMMENT", "COLUMN_META_CACHE", "CACHE_LEVEL") + supportedOptions.contains(propKey.toUpperCase) + } + + /** + * validate column meta cache and cache level properties if configured by the user + * + * @param carbonTable + * @param propertiesMap + */ + private def validateColumnMetaCacheAndCacheLevel(carbonTable: CarbonTable, + propertiesMap: mutable.Map[String, String]): Unit = { + // validate column meta cache property + if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) { + // Column meta cache is not allowed for child tables and dataMaps + if (carbonTable.isChildDataMap) { + throw new MalformedCarbonCommandException(s"Table property ${ + CarbonCommonConstants.COLUMN_META_CACHE} is not allowed for child datamaps") + } + val schemaList: util.List[ColumnSchema] = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName), + carbonTable.getMeasureByTableName(carbonTable.getTableName)) + val tableColumns: Seq[String] = schemaList.asScala + .map(columnSchema => columnSchema.getColumnName) + CommonUtil + .validateColumnMetaCacheFields(carbonTable.getDatabaseName, + carbonTable.getTableName, + tableColumns, + propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get, + propertiesMap) + val columnsToBeCached = propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get + validateForComplexTypeColumn(carbonTable, columnsToBeCached) + } + // validate cache level property + if (propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) { + // Cache level is not allowed for child tables and dataMaps + if (carbonTable.isChildDataMap) { + throw new MalformedCarbonCommandException(s"Table property ${ + CarbonCommonConstants.CACHE_LEVEL} is not allowed for child datamaps") + } + CommonUtil.validateCacheLevel( + propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).get, + propertiesMap) + } + } + + /** + * This method will validate if there is any complex type column in the columns to be cached + * + * @param carbonTable + * @param cachedColumns + */ + private def validateForComplexTypeColumn(carbonTable: CarbonTable, + cachedColumns: String): Unit = { + if (cachedColumns.nonEmpty) { + cachedColumns.split(",").foreach { column => + val dimension = carbonTable.getDimensionByName(carbonTable.getTableName, column) + if (null != dimension && dimension.isComplex) { + val errorMessage = + s"$column is a complex type column and complex type is not allowed for " + + s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }" + throw new MalformedCarbonCommandException(errorMessage) + } + } + } + } + + /** + * This method will check and clear the driver block/blocklet cache + * + * @param carbonTable + * @param existingTableProperties + * @param newProperties + * @param propKeys + * @param set + */ + private def checkAndClearBlockletCache(carbonTable: CarbonTable, + existingTableProperties: scala.collection.mutable.Map[String, String], + newProperties: mutable.Map[String, String], + propKeys: Seq[String], + set: Boolean): Unit = { + if (set) { + clearBlockletCacheForCachingProperties(carbonTable, existingTableProperties, newProperties) + } else { + // convert all the unset keys to lower case + val propertiesToBeRemoved = propKeys.map(key => key.toLowerCase) + // This is unset scenario and the cache needs to be cleaned only when + // 1. column_meta_cache property is getting unset and existing table properties contains + // this property + // 2. cache_level property is being unset and existing table properties contains this property + // and the existing value is not equal to default value because after un-setting the property + // the cache should be loaded again with default value + if (propertiesToBeRemoved.contains(CarbonCommonConstants.COLUMN_META_CACHE) && + existingTableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) { + ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + } else if (propertiesToBeRemoved.contains(CarbonCommonConstants.CACHE_LEVEL)) { + val cacheLevel = existingTableProperties.get(CarbonCommonConstants.CACHE_LEVEL) + if (cacheLevel.isDefined && + !cacheLevel.equals(CarbonCommonConstants.CACHE_LEVEL_DEFAULT_VALUE)) { + ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + } + } + } + } + + /** + * This method will validate the column_meta_cache and cache_level properties and clear the + * driver block/blocklet cache + * + * @param carbonTable + * @param tblPropertiesMap + * @param newProperties + */ + private def clearBlockletCacheForCachingProperties( + carbonTable: CarbonTable, + tblPropertiesMap: scala.collection.mutable.Map[String, String], + newProperties: mutable.Map[String, String]): Unit = { + // check if column meta cache is defined. if defined then validate and clear the BTree cache + // if required + val columnMetaCacheProperty = newProperties.get(CarbonCommonConstants.COLUMN_META_CACHE) + columnMetaCacheProperty match { + case Some(newColumnsToBeCached) => + if (!checkIfColumnsAreAlreadyCached(carbonTable, tblPropertiesMap + .get(CarbonCommonConstants.COLUMN_META_CACHE), newColumnsToBeCached)) { + ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + } + case None => + // don't do anything + } + // check if column meta cache is defined. if defined then validate and clear the BTree cache + // if required + val cacheLevelProperty = newProperties.get(CarbonCommonConstants.CACHE_LEVEL) + cacheLevelProperty match { + case Some(newCacheLevelValue) => + if (!isCacheLevelValid(tblPropertiesMap.get(CarbonCommonConstants.CACHE_LEVEL), + newCacheLevelValue)) { + ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + } + case None => + // don't do anything + } + } + + /** + * Method to verify if the existing cache level is same as the new cache level + * + * @param existingCacheLevelValue + * @param newCacheLevelValue + * @return + */ + private def isCacheLevelValid(existingCacheLevelValue: Option[String], + newCacheLevelValue: String): Boolean = { + existingCacheLevelValue match { + case Some(existingValue) => + existingValue.equals(newCacheLevelValue) + case None => + false + } + } + + /** + * Check the new columns to be cached with the already cached columns. If count of new columns + * and already cached columns is same and all the new columns are already cached then + * false will be returned else true + * + * @param carbonTable + * @param existingCacheColumns + * @param newColumnsToBeCached + * @return + */ + private def checkIfColumnsAreAlreadyCached( + carbonTable: CarbonTable, + existingCacheColumns: Option[String], + newColumnsToBeCached: String): Boolean = { + val newColumns = newColumnsToBeCached.split(",").map(x => x.trim.toLowerCase) + val isCached = existingCacheColumns match { + case Some(value) => + val existingProperty = value.split(",").map(x => x.trim.toLowerCase) + compareColumns(existingProperty, newColumns) + case None => + // By default all the columns in the table will be cached. This case is to compare all the + // table columns already cached to the newly specified cached columns + val schemaList: util.List[ColumnSchema] = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName), + carbonTable.getMeasureByTableName(carbonTable.getTableName)) + val tableColumns: Array[String] = schemaList.asScala + .map(columnSchema => columnSchema.getColumnName).toArray + compareColumns(tableColumns, newColumns) + } + isCached } + + /** + * compare the existing cache columns and the new columns to be cached + * + * @param existingCachedColumns + * @param newColumnsToBeCached + * @return + */ + private def compareColumns(existingCachedColumns: Array[String], + newColumnsToBeCached: Array[String]): Boolean = { + val allColumnsMatch = if (existingCachedColumns.length == newColumnsToBeCached.length) { + existingCachedColumns.filter(col => !newColumnsToBeCached.contains(col)).length == 0 + } else { + false + } + allColumnsMatch + } + }