This is an automated email from the ASF dual-hosted git repository. xuchuanyin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 718be37 [CARBONDATA-3305] Support show metacache command to list the cache sizes for all tables 718be37 is described below commit 718be37295a55de3317191118bf74720e4de800f Author: QiangCai <qiang...@qq.com> AuthorDate: Thu Jan 17 15:39:33 2019 +0800 [CARBONDATA-3305] Support show metacache command to list the cache sizes for all tables >>> SHOW METACACHE +--------+--------+----------+------------+---------------+ |Database|Table |Index size|Datamap size|Dictionary size| +--------+--------+----------+------------+---------------+ |ALL |ALL |842 Bytes |982 Bytes |80.34 KB | |default |ALL |842 Bytes |982 Bytes |80.34 KB | |default |t1 |225 Bytes |982 Bytes |0 | |default |t1_dpagg|259 Bytes |0 |0 | |default |t2 |358 Bytes |0 |80.34 KB | +--------+--------+----------+------------+---------------+ >>> SHOW METACACHE FOR TABLE t1 +----------+---------+----------------------+ |Field |Size |Comment | +----------+---------+----------------------+ |Index |225 Bytes|1/1 index files cached| |Dictionary|0 | | |dpagg |259 Bytes|preaggregate | |dblom |982 Bytes|bloomfilter | +----------+---------+----------------------+ >>> SHOW METACACHE FOR TABLE t2 +----------+---------+----------------------+ |Field |Size |Comment | +----------+---------+----------------------+ |Index |358 Bytes|2/2 index files cached| |Dictionary|80.34 KB | | +----------+---------+----------------------+ This closes #3078 --- .../carbondata/core/cache/CacheProvider.java | 4 + .../carbondata/core/cache/CarbonLRUCache.java | 4 + docs/ddl-of-carbondata.md | 21 ++ .../sql/commands/TestCarbonShowCacheCommand.scala | 163 +++++++++++ .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 1 + .../command/cache/CarbonShowCacheCommand.scala | 312 +++++++++++++++++++++ .../spark/sql/parser/CarbonSpark2SqlParser.scala | 12 +- 7 files changed, 515 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java index 99b1693..deb48e2 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java @@ -195,4 +195,8 @@ public class CacheProvider { } cacheTypeToCacheMap.clear(); } + + public CarbonLRUCache getCarbonCache() { + return carbonLRUCache; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java index 87254e3..74ff8a0 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java @@ -305,4 +305,8 @@ public final class CarbonLRUCache { lruCacheMap.clear(); } } + + public Map<String, Cacheable> getCacheMap() { + return lruCacheMap; + } } diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md index 0d0e5bd..3476475 100644 --- a/docs/ddl-of-carbondata.md +++ b/docs/ddl-of-carbondata.md @@ -67,6 +67,7 @@ CarbonData DDL statements are documented here,which includes: * [SPLIT PARTITION](#split-a-partition) * [DROP PARTITION](#drop-a-partition) * [BUCKETING](#bucketing) +* [CACHE](#cache) ## CREATE TABLE @@ -1088,4 +1089,24 @@ Users can specify which columns to include and exclude for local dictionary gene TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='productName') ``` +## CACHE + CarbonData internally uses LRU caching to improve the performance. The user can get information + about current cache used status in memory through the following command: + + ```sql + SHOW METADATA + ``` + + This shows the overall memory consumed in the cache by categories - index files, dictionary and + datamaps. This also shows the cache usage by all the tables and children tables in the current + database. + + ```sql + SHOW METADATA ON TABLE tableName + ``` + + This shows detailed information on cache usage by the table `tableName` and its carbonindex files, + its dictionary files, its datamaps and children tables. + + This command is not allowed on child tables. diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala new file mode 100644 index 0000000..0e1cd00 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sql.commands + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll { + override protected def beforeAll(): Unit = { + // use new database + sql("drop database if exists cache_db cascade").collect() + sql("drop database if exists cache_empty_db cascade").collect() + sql("create database cache_db").collect() + sql("create database cache_empty_db").collect() + dropTable + sql("use cache_db").collect() + sql( + """ + | CREATE TABLE cache_db.cache_1 + | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, + | workgroupcategoryname String, deptno int, deptname String, projectcode int, + | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int, + | salary int) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname') + """.stripMargin) + // bloom + sql("CREATE DATAMAP IF NOT EXISTS cache_1_bloom ON TABLE cache_db.cache_1 USING 'bloomfilter' " + + "DMPROPERTIES('INDEX_COLUMNS'='deptno')") + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_1 ") + + sql( + """ + | CREATE TABLE cache_2 + | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, + | workgroupcategoryname String, deptno int, deptname String, projectcode int, + | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int, + | salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_db.cache_2 ") + sql("insert into table cache_2 select * from cache_1").collect() + + sql( + """ + | CREATE TABLE cache_3 + | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, + | workgroupcategoryname String, deptno int, deptname String, projectcode int, + | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int, + | salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_3 ") + + // use default database + sql("use default").collect() + sql( + """ + | CREATE TABLE cache_4 + | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, + | workgroupcategoryname String, deptno int, deptname String, projectcode int, + | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int, + | salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql("insert into table cache_4 select * from cache_db.cache_2").collect() + + // standard partition table + sql( + """ + | CREATE TABLE cache_5 + | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, + | workgroupcategoryname String, deptname String, projectcode int, + | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int, + | salary int) + | PARTITIONED BY (deptno int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + "insert into table cache_5 select empno,empname,designation,doj,workgroupcategory," + + "workgroupcategoryname,deptname,projectcode,projectjoindate,projectenddate,attendance," + + "utilization,salary,deptno from cache_4").collect() + + // datamap + sql("create datamap cache_4_count on table cache_4 using 'preaggregate' as " + + "select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname") + + // count star to cache index + sql("select max(deptname) from cache_db.cache_1").collect() + sql("SELECT deptno FROM cache_db.cache_1 where deptno=10").collect() + sql("select count(*) from cache_db.cache_2").collect() + sql("select count(*) from cache_4").collect() + sql("select count(*) from cache_5").collect() + sql("select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname").collect() + } + + + override protected def afterAll(): Unit = { + sql("use default").collect() + dropTable + } + + private def dropTable = { + sql("DROP TABLE IF EXISTS cache_db.cache_1") + sql("DROP TABLE IF EXISTS cache_db.cache_2") + sql("DROP TABLE IF EXISTS cache_db.cache_3") + sql("DROP TABLE IF EXISTS default.cache_4") + sql("DROP TABLE IF EXISTS default.cache_5") + } + + test("show cache") { + sql("use cache_empty_db").collect() + val result1 = sql("show metacache").collect() + assertResult(2)(result1.length) + assertResult(Row("cache_empty_db", "ALL", "0", "0", "0"))(result1(1)) + + sql("use cache_db").collect() + val result2 = sql("show metacache").collect() + assertResult(4)(result2.length) + + sql("use default").collect() + val result3 = sql("show metacache").collect() + val dataMapCacheInfo = result3 + .map(row => row.getString(1)) + .filter(table => table.equals("cache_4_cache_4_count")) + assertResult(1)(dataMapCacheInfo.length) + } + + test("show metacache on table") { + sql("use cache_db").collect() + val result1 = sql("show metacache on table cache_1").collect() + assertResult(3)(result1.length) + + val result2 = sql("show metacache on table cache_db.cache_2").collect() + assertResult(2)(result2.length) + + checkAnswer(sql("show metacache on table cache_db.cache_3"), + Seq(Row("Index", "0 bytes", "0/1 index files cached"), Row("Dictionary", "0 bytes", ""))) + + val result4 = sql("show metacache on table default.cache_4").collect() + assertResult(3)(result4.length) + + sql("use default").collect() + val result5 = sql("show metacache on table cache_5").collect() + assertResult(2)(result5.length) + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index dc75243..e03bebd 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -155,6 +155,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val HISTORY = carbonKeyWord("HISTORY") protected val SEGMENTS = carbonKeyWord("SEGMENTS") protected val SEGMENT = carbonKeyWord("SEGMENT") + protected val METACACHE = carbonKeyWord("METACACHE") protected val STRING = carbonKeyWord("STRING") protected val INTEGER = carbonKeyWord("INTEGER") diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala new file mode 100644 index 0000000..e937c32 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.cache + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import org.apache.commons.io.FileUtils.byteCountToDisplaySize +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.types.{LongType, StringType} + +import org.apache.carbondata.core.cache.CacheProvider +import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil + +/** + * SHOW CACHE + */ +case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier]) + extends MetadataCommand { + + override def output: Seq[AttributeReference] = { + if (tableIdentifier.isEmpty) { + Seq( + AttributeReference("Database", StringType, nullable = false)(), + AttributeReference("Table", StringType, nullable = false)(), + AttributeReference("Index size", StringType, nullable = false)(), + AttributeReference("Datamap size", StringType, nullable = false)(), + AttributeReference("Dictionary size", StringType, nullable = false)()) + } else { + Seq( + AttributeReference("Field", StringType, nullable = false)(), + AttributeReference("Size", StringType, nullable = false)(), + AttributeReference("Comment", StringType, nullable = false)()) + } + } + + override protected def opName: String = "SHOW CACHE" + + def showAllTablesCache(sparkSession: SparkSession): Seq[Row] = { + val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase + val cache = CacheProvider.getInstance().getCarbonCache() + if (cache == null) { + Seq(Row("ALL", "ALL", 0L, 0L, 0L), + Row(currentDatabase, "ALL", 0L, 0L, 0L)) + } else { + val tableIdents = sparkSession.sessionState.catalog.listTables(currentDatabase).toArray + val dbLocation = CarbonEnv.getDatabaseLocation(currentDatabase, sparkSession) + val tempLocation = dbLocation.replace( + CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR) + val tablePaths = tableIdents.map { tableIdent => + (tempLocation + CarbonCommonConstants.FILE_SEPARATOR + + tableIdent.table + CarbonCommonConstants.FILE_SEPARATOR, + CarbonEnv.getDatabaseName(tableIdent.database)(sparkSession) + "." + tableIdent.table) + } + + val dictIds = tableIdents + .map { tableIdent => + var table: CarbonTable = null + try { + table = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + } catch { + case _ => + } + table + } + .filter(_ != null) + .flatMap { table => + table + .getAllDimensions + .asScala + .filter(_.isGlobalDictionaryEncoding) + .toArray + .map(dim => (dim.getColumnId, table.getDatabaseName + "." + table.getTableName)) + } + + // all databases + var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L) + // current database + var (dbIndexSize, dbDatamapSize, dbDictSize) = (0L, 0L, 0L) + val tableMapIndexSize = mutable.HashMap[String, Long]() + val tableMapDatamapSize = mutable.HashMap[String, Long]() + val tableMapDictSize = mutable.HashMap[String, Long]() + val cacheIterator = cache.getCacheMap.entrySet().iterator() + while (cacheIterator.hasNext) { + val entry = cacheIterator.next() + val cache = entry.getValue + if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) { + // index + allIndexSize = allIndexSize + cache.getMemorySize + val indexPath = entry.getKey.replace( + CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR) + val tablePath = tablePaths.find(path => indexPath.startsWith(path._1)) + if (tablePath.isDefined) { + dbIndexSize = dbIndexSize + cache.getMemorySize + val memorySize = tableMapIndexSize.get(tablePath.get._2) + if (memorySize.isEmpty) { + tableMapIndexSize.put(tablePath.get._2, cache.getMemorySize) + } else { + tableMapIndexSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize) + } + } + } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) { + // bloom datamap + allDatamapSize = allDatamapSize + cache.getMemorySize + val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, + CarbonCommonConstants.FILE_SEPARATOR) + val tablePath = tablePaths.find(path => shardPath.contains(path._1)) + if (tablePath.isDefined) { + dbDatamapSize = dbDatamapSize + cache.getMemorySize + val memorySize = tableMapDatamapSize.get(tablePath.get._2) + if (memorySize.isEmpty) { + tableMapDatamapSize.put(tablePath.get._2, cache.getMemorySize) + } else { + tableMapDatamapSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize) + } + } + } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) { + // dictionary + allDictSize = allDictSize + cache.getMemorySize + val dictId = dictIds.find(id => entry.getKey.startsWith(id._1)) + if (dictId.isDefined) { + dbDictSize = dbDictSize + cache.getMemorySize + val memorySize = tableMapDictSize.get(dictId.get._2) + if (memorySize.isEmpty) { + tableMapDictSize.put(dictId.get._2, cache.getMemorySize) + } else { + tableMapDictSize.put(dictId.get._2, memorySize.get + cache.getMemorySize) + } + } + } + } + if (tableMapIndexSize.isEmpty && tableMapDatamapSize.isEmpty && tableMapDictSize.isEmpty) { + Seq( + Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize), + byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)), + Row(currentDatabase, "ALL", "0", "0", "0")) + } else { + val tableList = tableMapIndexSize + .map(_._1) + .toSeq + .union(tableMapDictSize.map(_._1).toSeq) + .distinct + .sorted + .map { uniqueName => + val values = uniqueName.split("\\.") + val indexSize = tableMapIndexSize.getOrElse(uniqueName, 0L) + val datamapSize = tableMapDatamapSize.getOrElse(uniqueName, 0L) + val dictSize = tableMapDictSize.getOrElse(uniqueName, 0L) + Row(values(0), values(1), byteCountToDisplaySize(indexSize), + byteCountToDisplaySize(datamapSize), byteCountToDisplaySize(dictSize)) + } + + Seq( + Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize), + byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)), + Row(currentDatabase, "ALL", byteCountToDisplaySize(dbIndexSize), + byteCountToDisplaySize(dbDatamapSize), byteCountToDisplaySize(dbDictSize)) + ) ++ tableList + } + } + } + + def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = { + val tableName = carbonTable.getTableName + val databaseName = carbonTable.getDatabaseName + val cache = CacheProvider.getInstance().getCarbonCache() + if (cache == null) { + Seq.empty + } else { + val dbLocation = CarbonEnv + .getDatabaseLocation(databaseName, sparkSession) + .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR) + val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + + tableName + CarbonCommonConstants.FILE_SEPARATOR + var numIndexFilesCached = 0 + + // Path -> Name, Type + val datamapName = mutable.Map[String, (String, String)]() + // Path -> Size + val datamapSize = mutable.Map[String, Long]() + // parent table + datamapName.put(tablePath, ("", "")) + datamapSize.put(tablePath, 0) + // children tables + for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) { + val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + "_" + + schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR + val name = schema.getDataMapName + val dmType = schema.getProviderName + datamapName.put(path, (name, dmType)) + datamapSize.put(path, 0) + } + // index schemas + for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) + .asScala) { + val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + + CarbonCommonConstants.FILE_SEPARATOR + schema.getDataMapName + + CarbonCommonConstants.FILE_SEPARATOR + val name = schema.getDataMapName + val dmType = schema.getProviderName + datamapName.put(path, (name, dmType)) + datamapSize.put(path, 0) + } + + var dictSize = 0L + + // dictionary column ids + val dictIds = carbonTable + .getAllDimensions + .asScala + .filter(_.isGlobalDictionaryEncoding) + .map(_.getColumnId) + .toArray + + val cacheIterator = cache.getCacheMap.entrySet().iterator() + while (cacheIterator.hasNext) { + val entry = cacheIterator.next() + val cache = entry.getValue + + if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) { + // index + val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, + CarbonCommonConstants.FILE_SEPARATOR) + val pathEntry = datamapSize.filter(entry => indexPath.startsWith(entry._1)) + if(pathEntry.nonEmpty) { + val (path, size) = pathEntry.iterator.next() + datamapSize.put(path, size + cache.getMemorySize) + } + if(indexPath.startsWith(tablePath)) { + numIndexFilesCached += 1 + } + } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) { + // bloom datamap + val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, + CarbonCommonConstants.FILE_SEPARATOR) + val pathEntry = datamapSize.filter(entry => shardPath.contains(entry._1)) + if(pathEntry.nonEmpty) { + val (path, size) = pathEntry.iterator.next() + datamapSize.put(path, size + cache.getMemorySize) + } + } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) { + // dictionary + val dictId = dictIds.find(id => entry.getKey.startsWith(id)) + if (dictId.isDefined) { + dictSize = dictSize + cache.getMemorySize + } + } + } + + // get all index files + val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath) + val numIndexFilesAll = CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier) + .asScala.map { + segment => + segment.getCommittedIndexFile + }.flatMap { + indexFilesMap => indexFilesMap.keySet().toArray + }.size + + var result = Seq( + Row("Index", byteCountToDisplaySize(datamapSize.get(tablePath).get), + numIndexFilesCached + "/" + numIndexFilesAll + " index files cached"), + Row("Dictionary", byteCountToDisplaySize(dictSize), "") + ) + for ((path, size) <- datamapSize) { + if (path != tablePath) { + val (dmName, dmType) = datamapName.get(path).get + result = result :+ Row(dmName, byteCountToDisplaySize(size), dmType) + } + } + result + } + } + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + if (tableIdentifier.isEmpty) { + showAllTablesCache(sparkSession) + } else { + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + if (carbonTable.isChildDataMap) { + throw new UnsupportedOperationException("Operation not allowed on child table.") + } + showTableCache(sparkSession, carbonTable) + } + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index d1023fa..a2923b8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -33,11 +33,11 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.types.StructField import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.command.cache.CarbonShowCacheCommand import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand} import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.spark.CarbonOption @@ -77,7 +77,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val startCommand: Parser[LogicalPlan] = loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords | - alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli + alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli | cacheManagement protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew @@ -94,6 +94,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val stream: Parser[LogicalPlan] = createStream | dropStream | showStreams + protected lazy val cacheManagement: Parser[LogicalPlan] = + showCache + protected lazy val alterAddPartition: Parser[LogicalPlan] = ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~> "(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ { @@ -494,6 +497,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { showHistory.isDefined) } + protected lazy val showCache: Parser[LogicalPlan] = + SHOW ~> METACACHE ~> opt(ontable) <~ opt(";") ^^ { + case table => + CarbonShowCacheCommand(table) + } protected lazy val cli: Parser[LogicalPlan] = (CARBONCLI ~> FOR ~> TABLE) ~> (ident <~ ".").? ~ ident ~