kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled 
show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r292374118
 
 

 ##########
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 ##########
 @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
     }
   }
 
-  override protected def opName: String = "SHOW CACHE"
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    if (tableIdentifier.isEmpty) {
+      /**
+       * Assemble result for database
+       */
+      getAllTablesCache(sparkSession)
+    } else {
+      /**
+       * Assemble result for table
+       */
+      val carbonTable = 
CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+      Checker
+        .validateTableExists(tableIdentifier.get.database, 
tableIdentifier.get.table, sparkSession)
+      val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
+      val driverRawResults = getTableCacheFromDriver(sparkSession, 
carbonTable, numberOfIndexFiles)
+      val indexRawResults = if 
(CarbonProperties.getInstance().isDistributedPruningEnabled
+      
(tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
+        tableIdentifier.get.table)) {
+        getTableCacheFromIndexServer(carbonTable, 
numberOfIndexFiles)(sparkSession)
+      } else { Seq() }
+      val result = driverRawResults.slice(0, 2) ++
+                   driverRawResults.drop(2).map { row =>
+                     Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+                   }
+      val serverResults = indexRawResults.slice(0, 2) ++
+                          indexRawResults.drop(2).map { row =>
+                            Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+                          }
+      Seq(Row("DRIVER CACHE", "", "")) ++ result.map {
+        row =>
+          Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
+      } ++ (serverResults match {
+        case Nil => Seq()
+        case list =>
+          Seq(Row("-----------", "-----------", "-----------"), Row("INDEX 
CACHE", "", "")) ++
+          list.map {
+          row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), 
row.get(2))
+        }
+      })
+    }
+  }
 
   def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
     val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
     val cache = CacheProvider.getInstance().getCarbonCache
-    if (cache == null) {
-      Seq(
-        Row("ALL", "ALL", 0L, 0L, 0L),
-        Row(currentDatabase, "ALL", 0L, 0L, 0L))
-    } else {
-      var carbonTables = mutable.ArrayBuffer[CarbonTable]()
-      sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
-        tableIdent =>
+    val isDistributedPruningEnabled = CarbonProperties.getInstance()
+      .isDistributedPruningEnabled("", "")
+    if (cache == null && !isDistributedPruningEnabled) {
+      return makeEmptyCacheRows(currentDatabase)
+    }
+    var carbonTables = mutable.ArrayBuffer[CarbonTable]()
+    sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
+      tableIdent =>
+        try {
+          val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+          if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) {
+            carbonTables += carbonTable
+          }
+        } catch {
+          case _: NoSuchTableException =>
+            LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+        }
+    }
+    val indexServerRows = if (isDistributedPruningEnabled) {
+      carbonTables.flatMap {
+        mainTable =>
           try {
-            val carbonTable = 
CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
-            if (!carbonTable.isChildDataMap) {
-              carbonTables += carbonTable
-            }
+            makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), 
mainTable)
           } catch {
-            case ex: NoSuchTableException =>
-              LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+            case ex: UnsupportedOperationException => Seq()
           }
       }
+    } else { Seq() }
 
-      // All tables of current database
-      var (dbDatamapSize, dbDictSize) = (0L, 0L)
-      val tableList = carbonTables.flatMap {
+    val driverRows = if (cache != null) {
+      carbonTables.flatMap {
         carbonTable =>
           try {
-            val tableResult = getTableCache(sparkSession, carbonTable)
-            var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
-            tableResult.drop(2).foreach {
-              row =>
-                indexSize += row.getLong(1)
-                datamapSize += row.getLong(2)
-            }
-            val dictSize = tableResult(1).getLong(1)
-
-            dbDictSize += dictSize
-            dbDatamapSize += datamapSize
-
-            val tableName = if (!carbonTable.isTransactionalTable) {
-              carbonTable.getTableName + " (external table)"
-            }
-            else {
-              carbonTable.getTableName
-            }
-            Seq((currentDatabase, tableName, indexSize, datamapSize, dictSize))
+            makeRows(getTableCacheFromDriver(sparkSession, carbonTable), 
carbonTable)
           } catch {
-            case ex: UnsupportedOperationException =>
-              Seq.empty
+            case ex: UnsupportedOperationException => Seq()
           }
-      }.collect {
-        case (db, table, indexSize, datamapSize, dictSize) if !((indexSize == 
0) &&
-                                                                (datamapSize 
== 0) &&
-                                                                (dictSize == 
0)) =>
-          Row(db, table, indexSize, datamapSize, dictSize)
       }
+    } else { Seq() }
+
+    val (driverdbIndexSize, driverdbDatamapSize, driverdbDictSize) = 
calculateDBIndexAndDatamapSize(
+      driverRows)
+    val (indexAllIndexSize, indexAllDatamapSize, indexAllDictSize) = 
calculateDBIndexAndDatamapSize(
+      indexServerRows)
+    val (indexdbIndexSize, indexdbDatamapSize) = 
getIndexServerCacheSizeForCurrentDB(
+      currentDatabase)
+
 
+    val driverDisplayRows = if (cache != null) {
       val tablePaths = carbonTables.map {
         carbonTable =>
           carbonTable.getTablePath
       }
-
-      // Scan whole cache and fill the entries for All-Database-All-Tables
-      // and Current-Database-All-Tables
-      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
-      var dbIndexSize = 0L
-      cache.getCacheMap.asScala.foreach {
-        case (key, cacheable) =>
-          cacheable match {
-            case _: BlockletDataMapIndexWrapper =>
-              allIndexSize += cacheable.getMemorySize
-              if (tablePaths.exists { path => key.startsWith(path) }) {
-                dbIndexSize += cacheable.getMemorySize
-              }
-            case _: BloomCacheKeyValue.CacheValue =>
-              allDatamapSize += cacheable.getMemorySize
-            case _: AbstractColumnDictionaryInfo =>
-              allDictSize += cacheable.getMemorySize
-          }
+      val (driverIndexSize, driverDatamapSize, allDictSize) = 
getAllDriverCacheSize(tablePaths
+        .toList)
+      if (driverRows.nonEmpty) {
+        val rows = (Seq(
+          Row("ALL", "ALL", driverIndexSize, driverDatamapSize, allDictSize),
+          Row(currentDatabase, "ALL", driverdbIndexSize, driverdbDatamapSize, 
driverdbDictSize)
+        ) ++ driverRows).collect {
+          case row if row.getLong(2) != 0L || row.getLong(3) != 0L || 
row.getLong(4) != 0L =>
+            Row(row(0), row(1), bytesToDisplaySize(row.getLong(2)),
+              bytesToDisplaySize(row.getLong(3)), 
bytesToDisplaySize(row.getLong(4)))
+        }
+        Seq(Row("DRIVER CACHE", "", "", "", "")) ++ rows
+      } else {
+        makeEmptyCacheRows(currentDatabase)
       }
+    } else {
+      makeEmptyCacheRows(currentDatabase)
+    }
 
-      Seq(
-        Row("ALL", "ALL", allIndexSize, allDatamapSize, allDictSize),
-        Row(currentDatabase, "ALL", dbIndexSize, dbDatamapSize, dbDictSize)
-      ) ++ tableList
+    //      val (serverIndexSize, serverDataMapSize) = 
getAllIndexServerCacheSize
+    val indexDisplayRows = if (indexServerRows.nonEmpty) {
+      val rows = (Seq(
+        Row("ALL", "ALL", indexAllIndexSize, indexAllDatamapSize, 
indexAllDictSize),
+        Row(currentDatabase, "ALL", indexdbIndexSize, indexdbDatamapSize, 
driverdbDictSize)
+      ) ++ indexServerRows).collect {
+        case row if row.getLong(2) != 0L || row.getLong(3) != 0L || 
row.getLong(4) != 0L =>
+          Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)),
+            bytesToDisplaySize(row.getLong(3)), 
bytesToDisplaySize(row.getLong(4)))
+      }
+      Seq(Row("INDEX SERVER CACHE", "", "", "", "")) ++ rows
+    } else {
+      Seq()
     }
+    driverDisplayRows ++ indexDisplayRows
   }
 
-  def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): 
Seq[Row] = {
+  def getTableCacheFromDriver(sparkSession: SparkSession, carbonTable: 
CarbonTable,
+      numOfIndexFiles: Int = 0): Seq[Row] = {
     val cache = CacheProvider.getInstance().getCarbonCache
-    val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable)
-    if (cache == null) {
-      var comments = 0 + "/" + allIndexFiles.size + " index files cached"
+    if (cache != null) {
+      val childTableList = getChildTableList(carbonTable)(sparkSession)
+      val parentMetaCacheInfo = collectDriverMetaCacheInfo(carbonTable, 
false)(sparkSession) match {
+        case head :: _ => head
+        case Nil => ("", 0, 0L, "")
+      }
+      val parentDictionary = getDictionarySize(carbonTable)(sparkSession)
+      val childMetaCacheInfos = childTableList.flatMap {
+        childTable =>
+          val tableArray = childTable._1.split("-")
+          val dbName = tableArray(0)
+          val tableName = tableArray(1)
+          if (childTable._2
+            .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName)) {
 
 Review comment:
   removed the specific check. There are 2 places where this kind of check 
still exists in case of index server because i dont have the TableDataMap 
object to differentiate between Bloom and others. Index server only returns the 
provider name

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to