ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r290997945
########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala ########## @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], } } - override protected def opName: String = "SHOW CACHE" + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + if (tableIdentifier.isEmpty) { + /** + * Assemble result for database + */ + getAllTablesCache(sparkSession) + } else { + /** + * Assemble result for table + */ + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + Checker + .validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) + val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled + (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), + tableIdentifier.get.table)) { + getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession) + } else { Seq() } + val result = driverRawResults.slice(0, 2) ++ + driverRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + val serverResults = indexRawResults.slice(0, 2) ++ + indexRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + Seq(Row("DRIVER CACHE", "", "")) ++ result.map { + row => + Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } ++ (serverResults match { + case Nil => Seq() + case list => + Seq(Row("-----------", "-----------", "-----------"), Row("INDEX CACHE", "", "")) ++ + list.map { + row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } + }) + } + } def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = { val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache - if (cache == null) { - Seq( - Row("ALL", "ALL", 0L, 0L, 0L), - Row(currentDatabase, "ALL", 0L, 0L, 0L)) - } else { - var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { - tableIdent => + val isDistributedPruningEnabled = CarbonProperties.getInstance() + .isDistributedPruningEnabled("", "") + if (cache == null && !isDistributedPruningEnabled) { + return makeEmptyCacheRows(currentDatabase) + } + var carbonTables = mutable.ArrayBuffer[CarbonTable]() + sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + tableIdent => + try { + val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) { + carbonTables += carbonTable + } + } catch { + case _: NoSuchTableException => + LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) + } + } + val indexServerRows = if (isDistributedPruningEnabled) { + carbonTables.flatMap { + mainTable => try { - val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) - if (!carbonTable.isChildDataMap) { - carbonTables += carbonTable - } + makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable) } catch { - case ex: NoSuchTableException => - LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) + case ex: UnsupportedOperationException => Seq() } } + } else { Seq() } - // All tables of current database - var (dbDatamapSize, dbDictSize) = (0L, 0L) - val tableList = carbonTables.flatMap { + val driverRows = if (cache != null) { + carbonTables.flatMap { carbonTable => try { - val tableResult = getTableCache(sparkSession, carbonTable) - var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L) - tableResult.drop(2).foreach { - row => - indexSize += row.getLong(1) - datamapSize += row.getLong(2) - } - val dictSize = tableResult(1).getLong(1) - - dbDictSize += dictSize - dbDatamapSize += datamapSize - - val tableName = if (!carbonTable.isTransactionalTable) { - carbonTable.getTableName + " (external table)" - } - else { - carbonTable.getTableName - } - Seq((currentDatabase, tableName, indexSize, datamapSize, dictSize)) + makeRows(getTableCacheFromDriver(sparkSession, carbonTable), carbonTable) } catch { - case ex: UnsupportedOperationException => - Seq.empty + case ex: UnsupportedOperationException => Seq() } - }.collect { - case (db, table, indexSize, datamapSize, dictSize) if !((indexSize == 0) && - (datamapSize == 0) && - (dictSize == 0)) => - Row(db, table, indexSize, datamapSize, dictSize) } + } else { Seq() } + + val (driverdbIndexSize, driverdbDatamapSize, driverdbDictSize) = calculateDBIndexAndDatamapSize( + driverRows) + val (indexAllIndexSize, indexAllDatamapSize, indexAllDictSize) = calculateDBIndexAndDatamapSize( + indexServerRows) + val (indexdbIndexSize, indexdbDatamapSize) = getIndexServerCacheSizeForCurrentDB( + currentDatabase) + + val driverDisplayRows = if (cache != null) { val tablePaths = carbonTables.map { carbonTable => carbonTable.getTablePath } - - // Scan whole cache and fill the entries for All-Database-All-Tables - // and Current-Database-All-Tables - var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L) - var dbIndexSize = 0L - cache.getCacheMap.asScala.foreach { - case (key, cacheable) => - cacheable match { - case _: BlockletDataMapIndexWrapper => - allIndexSize += cacheable.getMemorySize - if (tablePaths.exists { path => key.startsWith(path) }) { - dbIndexSize += cacheable.getMemorySize - } - case _: BloomCacheKeyValue.CacheValue => - allDatamapSize += cacheable.getMemorySize - case _: AbstractColumnDictionaryInfo => - allDictSize += cacheable.getMemorySize - } + val (driverIndexSize, driverDatamapSize, allDictSize) = getAllDriverCacheSize(tablePaths + .toList) + if (driverRows.nonEmpty) { + val rows = (Seq( + Row("ALL", "ALL", driverIndexSize, driverDatamapSize, allDictSize), + Row(currentDatabase, "ALL", driverdbIndexSize, driverdbDatamapSize, driverdbDictSize) + ) ++ driverRows).collect { + case row if row.getLong(2) != 0L || row.getLong(3) != 0L || row.getLong(4) != 0L => + Row(row(0), row(1), bytesToDisplaySize(row.getLong(2)), + bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4))) + } + Seq(Row("DRIVER CACHE", "", "", "", "")) ++ rows + } else { + makeEmptyCacheRows(currentDatabase) } + } else { + makeEmptyCacheRows(currentDatabase) + } - Seq( - Row("ALL", "ALL", allIndexSize, allDatamapSize, allDictSize), - Row(currentDatabase, "ALL", dbIndexSize, dbDatamapSize, dbDictSize) - ) ++ tableList + // val (serverIndexSize, serverDataMapSize) = getAllIndexServerCacheSize + val indexDisplayRows = if (indexServerRows.nonEmpty) { + val rows = (Seq( + Row("ALL", "ALL", indexAllIndexSize, indexAllDatamapSize, indexAllDictSize), + Row(currentDatabase, "ALL", indexdbIndexSize, indexdbDatamapSize, driverdbDictSize) + ) ++ indexServerRows).collect { + case row if row.getLong(2) != 0L || row.getLong(3) != 0L || row.getLong(4) != 0L => + Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)), + bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4))) + } + Seq(Row("INDEX SERVER CACHE", "", "", "", "")) ++ rows + } else { + Seq() } + driverDisplayRows ++ indexDisplayRows } - def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = { + def getTableCacheFromDriver(sparkSession: SparkSession, carbonTable: CarbonTable, + numOfIndexFiles: Int = 0): Seq[Row] = { val cache = CacheProvider.getInstance().getCarbonCache - val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable) - if (cache == null) { - var comments = 0 + "/" + allIndexFiles.size + " index files cached" + if (cache != null) { + val childTableList = getChildTableList(carbonTable)(sparkSession) + val parentMetaCacheInfo = collectDriverMetaCacheInfo(carbonTable, false)(sparkSession) match { + case head :: _ => head + case Nil => ("", 0, 0L, "") + } + val parentDictionary = getDictionarySize(carbonTable)(sparkSession) + val childMetaCacheInfos = childTableList.flatMap { + childTable => + val tableArray = childTable._1.split("-") + val dbName = tableArray(0) + val tableName = tableArray(1) + if (childTable._2 + .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName)) { Review comment: Please don't check the specific datamap type anywhere, we don't want to update this code every time new datamap is added. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services