kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled 
show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r292373470
 
 

 ##########
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 ##########
 @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
     }
   }
 
-  override protected def opName: String = "SHOW CACHE"
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    if (tableIdentifier.isEmpty) {
+      /**
+       * Assemble result for database
+       */
+      getAllTablesCache(sparkSession)
+    } else {
+      /**
+       * Assemble result for table
+       */
+      val carbonTable = 
CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+      Checker
+        .validateTableExists(tableIdentifier.get.database, 
tableIdentifier.get.table, sparkSession)
+      val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
+      val driverRawResults = getTableCacheFromDriver(sparkSession, 
carbonTable, numberOfIndexFiles)
+      val indexRawResults = if 
(CarbonProperties.getInstance().isDistributedPruningEnabled
+      
(tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
+        tableIdentifier.get.table)) {
+        getTableCacheFromIndexServer(carbonTable, 
numberOfIndexFiles)(sparkSession)
+      } else { Seq() }
+      val result = driverRawResults.slice(0, 2) ++
+                   driverRawResults.drop(2).map { row =>
+                     Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+                   }
+      val serverResults = indexRawResults.slice(0, 2) ++
+                          indexRawResults.drop(2).map { row =>
+                            Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+                          }
+      Seq(Row("DRIVER CACHE", "", "")) ++ result.map {
+        row =>
+          Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
+      } ++ (serverResults match {
+        case Nil => Seq()
+        case list =>
+          Seq(Row("-----------", "-----------", "-----------"), Row("INDEX 
CACHE", "", "")) ++
+          list.map {
+          row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), 
row.get(2))
+        }
+      })
+    }
+  }
 
   def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
     val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
     val cache = CacheProvider.getInstance().getCarbonCache
-    if (cache == null) {
-      Seq(
-        Row("ALL", "ALL", 0L, 0L, 0L),
-        Row(currentDatabase, "ALL", 0L, 0L, 0L))
-    } else {
-      var carbonTables = mutable.ArrayBuffer[CarbonTable]()
-      sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
-        tableIdent =>
+    val isDistributedPruningEnabled = CarbonProperties.getInstance()
+      .isDistributedPruningEnabled("", "")
+    if (cache == null && !isDistributedPruningEnabled) {
+      return makeEmptyCacheRows(currentDatabase)
+    }
+    var carbonTables = mutable.ArrayBuffer[CarbonTable]()
+    sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
+      tableIdent =>
+        try {
+          val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+          if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) {
+            carbonTables += carbonTable
+          }
+        } catch {
+          case _: NoSuchTableException =>
+            LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+        }
+    }
+    val indexServerRows = if (isDistributedPruningEnabled) {
+      carbonTables.flatMap {
+        mainTable =>
           try {
-            val carbonTable = 
CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
-            if (!carbonTable.isChildDataMap) {
-              carbonTables += carbonTable
-            }
+            makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), 
mainTable)
           } catch {
-            case ex: NoSuchTableException =>
-              LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+            case ex: UnsupportedOperationException => Seq()
           }
       }
+    } else { Seq() }
 
-      // All tables of current database
-      var (dbDatamapSize, dbDictSize) = (0L, 0L)
-      val tableList = carbonTables.flatMap {
+    val driverRows = if (cache != null) {
+      carbonTables.flatMap {
         carbonTable =>
           try {
-            val tableResult = getTableCache(sparkSession, carbonTable)
-            var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
-            tableResult.drop(2).foreach {
-              row =>
-                indexSize += row.getLong(1)
-                datamapSize += row.getLong(2)
-            }
-            val dictSize = tableResult(1).getLong(1)
-
-            dbDictSize += dictSize
-            dbDatamapSize += datamapSize
-
-            val tableName = if (!carbonTable.isTransactionalTable) {
-              carbonTable.getTableName + " (external table)"
-            }
-            else {
-              carbonTable.getTableName
-            }
-            Seq((currentDatabase, tableName, indexSize, datamapSize, dictSize))
+            makeRows(getTableCacheFromDriver(sparkSession, carbonTable), 
carbonTable)
           } catch {
-            case ex: UnsupportedOperationException =>
-              Seq.empty
+            case ex: UnsupportedOperationException => Seq()
           }
-      }.collect {
-        case (db, table, indexSize, datamapSize, dictSize) if !((indexSize == 
0) &&
-                                                                (datamapSize 
== 0) &&
-                                                                (dictSize == 
0)) =>
-          Row(db, table, indexSize, datamapSize, dictSize)
       }
+    } else { Seq() }
+
+    val (driverdbIndexSize, driverdbDatamapSize, driverdbDictSize) = 
calculateDBIndexAndDatamapSize(
+      driverRows)
+    val (indexAllIndexSize, indexAllDatamapSize, indexAllDictSize) = 
calculateDBIndexAndDatamapSize(
+      indexServerRows)
+    val (indexdbIndexSize, indexdbDatamapSize) = 
getIndexServerCacheSizeForCurrentDB(
+      currentDatabase)
+
 
+    val driverDisplayRows = if (cache != null) {
       val tablePaths = carbonTables.map {
         carbonTable =>
           carbonTable.getTablePath
       }
-
-      // Scan whole cache and fill the entries for All-Database-All-Tables
-      // and Current-Database-All-Tables
-      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
-      var dbIndexSize = 0L
-      cache.getCacheMap.asScala.foreach {
-        case (key, cacheable) =>
-          cacheable match {
-            case _: BlockletDataMapIndexWrapper =>
-              allIndexSize += cacheable.getMemorySize
-              if (tablePaths.exists { path => key.startsWith(path) }) {
-                dbIndexSize += cacheable.getMemorySize
-              }
-            case _: BloomCacheKeyValue.CacheValue =>
-              allDatamapSize += cacheable.getMemorySize
-            case _: AbstractColumnDictionaryInfo =>
-              allDictSize += cacheable.getMemorySize
-          }
+      val (driverIndexSize, driverDatamapSize, allDictSize) = 
getAllDriverCacheSize(tablePaths
+        .toList)
+      if (driverRows.nonEmpty) {
+        val rows = (Seq(
+          Row("ALL", "ALL", driverIndexSize, driverDatamapSize, allDictSize),
+          Row(currentDatabase, "ALL", driverdbIndexSize, driverdbDatamapSize, 
driverdbDictSize)
+        ) ++ driverRows).collect {
+          case row if row.getLong(2) != 0L || row.getLong(3) != 0L || 
row.getLong(4) != 0L =>
+            Row(row(0), row(1), bytesToDisplaySize(row.getLong(2)),
+              bytesToDisplaySize(row.getLong(3)), 
bytesToDisplaySize(row.getLong(4)))
+        }
+        Seq(Row("DRIVER CACHE", "", "", "", "")) ++ rows
+      } else {
+        makeEmptyCacheRows(currentDatabase)
       }
+    } else {
+      makeEmptyCacheRows(currentDatabase)
+    }
 
-      Seq(
-        Row("ALL", "ALL", allIndexSize, allDatamapSize, allDictSize),
-        Row(currentDatabase, "ALL", dbIndexSize, dbDatamapSize, dbDictSize)
-      ) ++ tableList
+    //      val (serverIndexSize, serverDataMapSize) = 
getAllIndexServerCacheSize
+    val indexDisplayRows = if (indexServerRows.nonEmpty) {
+      val rows = (Seq(
+        Row("ALL", "ALL", indexAllIndexSize, indexAllDatamapSize, 
indexAllDictSize),
+        Row(currentDatabase, "ALL", indexdbIndexSize, indexdbDatamapSize, 
driverdbDictSize)
+      ) ++ indexServerRows).collect {
+        case row if row.getLong(2) != 0L || row.getLong(3) != 0L || 
row.getLong(4) != 0L =>
+          Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)),
+            bytesToDisplaySize(row.getLong(3)), 
bytesToDisplaySize(row.getLong(4)))
+      }
+      Seq(Row("INDEX SERVER CACHE", "", "", "", "")) ++ rows
+    } else {
+      Seq()
     }
+    driverDisplayRows ++ indexDisplayRows
   }
 
-  def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): 
Seq[Row] = {
+  def getTableCacheFromDriver(sparkSession: SparkSession, carbonTable: 
CarbonTable,
+      numOfIndexFiles: Int = 0): Seq[Row] = {
     val cache = CacheProvider.getInstance().getCarbonCache
-    val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable)
-    if (cache == null) {
-      var comments = 0 + "/" + allIndexFiles.size + " index files cached"
+    if (cache != null) {
+      val childTableList = getChildTableList(carbonTable)(sparkSession)
+      val parentMetaCacheInfo = collectDriverMetaCacheInfo(carbonTable, 
false)(sparkSession) match {
+        case head :: _ => head
+        case Nil => ("", 0, 0L, "")
+      }
+      val parentDictionary = getDictionarySize(carbonTable)(sparkSession)
+      val childMetaCacheInfos = childTableList.flatMap {
+        childTable =>
+          val tableArray = childTable._1.split("-")
+          val dbName = tableArray(0)
+          val tableName = tableArray(1)
+          if (childTable._2
+            .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName)) {
+            val datamaps = 
DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+              .asScala
+            val bloomDataMaps = datamaps.collect {
+              case datamap if datamap.getProviderName
+                
.equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName) =>
+                datamap
+            }.toList
+
+            // Get datamap keys
+            val datamapKeys = bloomDataMaps.flatMap {
+              datamap =>
+                CacheUtil
+                  .getBloomCacheKeys(carbonTable, datamap)
+            }
+
+            // calculate the memory size if key exists in cache
+            val datamapSize = datamapKeys.collect {
+              case key if cache.get(key) != null =>
+                cache.get(key).getMemorySize
+            }.sum
+            Seq(Row(childTable._1, 0L, datamapSize, childTable._2))
+          } else {
+            val childCarbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
+            val childMetaCacheInfo = 
collectDriverMetaCacheInfo(childCarbonTable)(sparkSession)
+            childMetaCacheInfo.map {
+              childMeta => Row(childMeta._1, childMeta._3, 0L, childTable._2)
+            }.toSeq
+          }
+      }
+      var comments = parentMetaCacheInfo._2 + s"/$numOfIndexFiles index files 
cached"
       if (!carbonTable.isTransactionalTable) {
         comments += " (external table)"
       }
-      return Seq(
-        Row("Index", 0L, comments),
+      Seq(
+        Row("Index", parentMetaCacheInfo._3, comments),
+        Row("Dictionary", parentDictionary, "")
+      ) ++ childMetaCacheInfos
+    } else {
+      Seq(
+        Row("Index", 0L, ""),
         Row("Dictionary", 0L, "")
       )
     }
+  }
 
-    val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, 
internalCall)
-    val operationContext = new OperationContext
-    // datamapName -> (datamapProviderName, indexSize, datamapSize)
-    val currentTableSizeMap = scala.collection.mutable.Map[String, (String, 
String, Long, Long)]()
-    operationContext.setProperty(carbonTable.getTableUniqueName, 
currentTableSizeMap)
-    OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, 
operationContext)
+  override protected def opName: String = "SHOW CACHE"
 
-    // Get all Index files for the specified table in cache
-    val (indexFilesLength, size) = if (CarbonProperties.getInstance()
-        .isDistributedPruningEnabled(carbonTable.getDatabaseName, 
carbonTable.getTableName)) {
-      getTableCache(carbonTable.getTableUniqueName)
-    } else {
-      val memorySizeForEachIndexFile: List[Long] = allIndexFiles.collect {
-        case indexFile if cache.get(indexFile) != null =>
-          cache.get(indexFile).getMemorySize
-      }
-      (memorySizeForEachIndexFile.length, memorySizeForEachIndexFile.sum)
+  private def makeEmptyCacheRows(currentDatabase: String) = {
+    Seq(
+      Row("ALL", "ALL", bytesToDisplaySize(0), bytesToDisplaySize(0), 
bytesToDisplaySize(0)),
+      Row(currentDatabase, "ALL", bytesToDisplaySize(0), bytesToDisplaySize(0),
+        bytesToDisplaySize(0)))
+  }
+
+  private def calculateDBIndexAndDatamapSize(rows: Seq[Row]): (Long, Long, 
Long) = {
+    rows.map {
+      row =>
+        (row(2).asInstanceOf[Long], row(3).asInstanceOf[Long], 
row.get(4).asInstanceOf[Long])
+    }.fold((0L, 0L, 0L)) {
+      case (a, b) =>
+        (a._1 + b._1, a._2 + b._2, a._3 + b._3)
     }
+  }
 
-    // Extract dictionary keys for the table and create cache keys from those
-    val dictKeys = CacheUtil.getAllDictCacheKeys(carbonTable)
-    val sizeOfDictInCache = dictKeys.collect {
-      case dictKey if cache.get(dictKey) != null =>
-        cache.get(dictKey).getMemorySize
-    }.sum
+  private def makeRows(tableResult: Seq[Row], carbonTable: CarbonTable) = {
+    var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
+    tableResult.drop(2).foreach {
+      row =>
+        indexSize += row.getLong(1)
+        datamapSize += row.getLong(2)
+    }
+    val dictSize = tableResult(1).getLong(1)
+    Seq(Row(carbonTable.getDatabaseName, carbonTable.getTableName,
+      indexSize,
+      datamapSize,
+      dictSize))
+  }
 
-    // Assemble result for all the datamaps for the table
-    val otherDatamaps = 
operationContext.getProperty(carbonTable.getTableUniqueName)
-      .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
-    val otherDatamapsResults: Seq[Row] = otherDatamaps.map {
-      case (name, (provider, indexSize, dmSize)) =>
-        Row(name, indexSize, dmSize, provider)
-    }.toSeq
-    var comments = indexFilesLength + "/" + allIndexFiles.size + " index files 
cached"
-    if (!carbonTable.isTransactionalTable) {
+  private def getTableCacheFromIndexServer(mainTable: CarbonTable, 
numberOfIndexFiles: Int = 0)
+    (sparkSession: SparkSession): Seq[Row] = {
+    val childTables = getChildTableList(mainTable)(sparkSession)
+    val cache = if (tableIdentifier.nonEmpty) {
+      executeJobToGetCache(childTables.map(_._1) ++ 
List(mainTable.getTableUniqueName))
+    } else {
+      cacheResult
+    }
+    val (mainTableFiles, mainTableCache) = getTableCache(cache, 
mainTable.getTableUniqueName)
+    val childMetaCacheInfos = childTables.flatMap {
+      childTable =>
+        val tableName = childTable._1.replace("-", "_")
+        if (childTable._2
+          .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName) || 
childTable._2
+          .equalsIgnoreCase(DataMapClassProvider.MV.getShortName)) {
+          Seq(Row(tableName, 0L, getTableCache(cache, tableName)._2, 
childTable._2))
+        } else {
+          val childCache = getTableCache(cache, tableName)._2
+          Seq(Row(tableName, childCache, 0L, childTable._2))
+        }
+    }
+    var comments = mainTableFiles + s"/$numberOfIndexFiles index files cached"
+    if (!mainTable.isTransactionalTable) {
       comments += " (external table)"
     }
     Seq(
-      Row("Index", size, comments),
-      Row("Dictionary", sizeOfDictInCache, "")
-    ) ++ otherDatamapsResults
+      Row("Index", mainTableCache, comments),
+      Row("Dictionary", getDictionarySize(mainTable)(sparkSession), "")
+    ) ++ childMetaCacheInfos
+
   }
 
-  private lazy val cacheResult: Seq[(String, Int, Long)] = {
-    val tableUniqueName = tableIdentifier match {
-      case Some(identifier) => s"${
-        identifier.database.getOrElse(SparkSession.getActiveSession
-          .get.catalog.currentDatabase)
-      }_${ identifier.table }"
-      case None => ""
-    }
-    val (result, time) = CarbonScalaUtil.logTime {
-      try {
-        IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
+  private def executeJobToGetCache(tableUniqueNames: List[String]): 
Seq[(String, Int, Long,
+    String)] = {
+    try {
+      val (result, time) = CarbonScalaUtil.logTime {
+        
IndexServer.getClient.showCache(tableUniqueNames.mkString(",")).map(_.split(":"))
           .groupBy(_.head).map { t =>
           var sum = 0L
           var length = 0
+          var provider = ""
           t._2.foreach {
             arr =>
               sum += arr(2).toLong
               length += arr(1).toInt
+              provider = arr(3)
           }
-          (t._1, length, sum)
+          (t._1, length, sum, provider)
         }
-      } catch {
-        case e: Exception =>
-          throw new RuntimeException("Failed to get Cache Information. ", e)
       }
+      LOGGER.info(s"Time taken to get cache results from Index Server is $time 
ms")
+      result.toList
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("Error while getting cache details from index server", ex)
+        Seq()
     }
-    LOGGER.info(s"Time taken to get cache results from Index Server is $time 
ms")
-    result.toList
   }
 
-  private def getTableCache(tableName: String): (Int, Long) = {
-    val (_, indexFileLength, cacheSize) = cacheResult.find(_._1 == 
tableName).getOrElse(("", 0, 0L))
+  private def getTableCache(cache: Seq[(String, Int, Long, String)], 
tableName: String) = {
+    val (_, indexFileLength, cacheSize, _) = cache.find(_._1 == tableName)
+      .getOrElse(("", 0, 0L, ""))
     (indexFileLength, cacheSize)
   }
 
-  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    if (tableIdentifier.isEmpty) {
-      /**
-       * Assemble result for database
-       */
-      val result = getAllTablesCache(sparkSession)
-      result.map {
-        row =>
-          Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)),
-            bytesToDisplaySize(row.getLong(3)), 
bytesToDisplaySize(row.getLong(4)))
-      }
+  private def getChildTableList(carbonTable: CarbonTable)
+    (sparkSession: SparkSession): List[(String, String)] = {
+    val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, 
internalCall)
+    val operationContext = new OperationContext
+    // datamapName -> (datamapProviderName, indexSize, datamapSize)
+    operationContext.setProperty(carbonTable.getTableUniqueName, List())
+    OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, 
operationContext)
+    operationContext.getProperty(carbonTable.getTableUniqueName)
+      .asInstanceOf[List[(String, String)]]
+  }
+
+  private def getDictionarySize(carbonTable: CarbonTable)(sparkSession: 
SparkSession): Long = {
+    val dictKeys = CacheUtil.getAllDictCacheKeys(carbonTable)
+    val cache = CacheProvider.getInstance().getCarbonCache
+    dictKeys.collect {
+      case dictKey if cache != null && cache.get(dictKey) != null =>
+        cache.get(dictKey).getMemorySize
+    }.sum
+  }
+
+  private def getAllDriverCacheSize(tablePaths: List[String]) = {
+    val cache = CacheProvider.getInstance().getCarbonCache
+    // Scan whole cache and fill the entries for All-Database-All-Tables
+    // and Current-Database-All-Tables
+    var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
+    var dbIndexSize = 0L
+    cache.getCacheMap.asScala.foreach {
+      case (key, cacheable) =>
+        cacheable match {
+          case _: BlockletDataMapIndexWrapper =>
+            allIndexSize += cacheable.getMemorySize
+            if (tablePaths.exists { path => key.startsWith(path) }) {
+              dbIndexSize += cacheable.getMemorySize
+            }
+          case _: BloomCacheKeyValue.CacheValue =>
 
 Review comment:
   changed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to