[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-06-12 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r292810798
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 ##
 @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
 }
   }
 
-  override protected def opName: String = "SHOW CACHE"
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+if (tableIdentifier.isEmpty) {
+  /**
+   * Assemble result for database
+   */
+  getAllTablesCache(sparkSession)
+} else {
+  /**
+   * Assemble result for table
+   */
+  val carbonTable = 
CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+  Checker
+.validateTableExists(tableIdentifier.get.database, 
tableIdentifier.get.table, sparkSession)
+  val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
+  val driverRawResults = getTableCacheFromDriver(sparkSession, 
carbonTable, numberOfIndexFiles)
+  val indexRawResults = if 
(CarbonProperties.getInstance().isDistributedPruningEnabled
+  
(tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
+tableIdentifier.get.table)) {
+getTableCacheFromIndexServer(carbonTable, 
numberOfIndexFiles)(sparkSession)
+  } else { Seq() }
+  val result = driverRawResults.slice(0, 2) ++
+   driverRawResults.drop(2).map { row =>
+ Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+   }
+  val serverResults = indexRawResults.slice(0, 2) ++
+  indexRawResults.drop(2).map { row =>
+Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+  }
+  Seq(Row("DRIVER CACHE", "", "")) ++ result.map {
+row =>
+  Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
+  } ++ (serverResults match {
+case Nil => Seq()
+case list =>
+  Seq(Row("---", "---", "---"), Row("INDEX 
CACHE", "", "")) ++
+  list.map {
+  row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), 
row.get(2))
+}
+  })
+}
+  }
 
   def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
 val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
 val cache = CacheProvider.getInstance().getCarbonCache
-if (cache == null) {
-  Seq(
-Row("ALL", "ALL", 0L, 0L, 0L),
-Row(currentDatabase, "ALL", 0L, 0L, 0L))
-} else {
-  var carbonTables = mutable.ArrayBuffer[CarbonTable]()
-  sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
-tableIdent =>
+val isDistributedPruningEnabled = CarbonProperties.getInstance()
+  .isDistributedPruningEnabled("", "")
+if (cache == null && !isDistributedPruningEnabled) {
+  return makeEmptyCacheRows(currentDatabase)
+}
+var carbonTables = mutable.ArrayBuffer[CarbonTable]()
+sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
+  tableIdent =>
+try {
+  val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+  if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) {
+carbonTables += carbonTable
+  }
+} catch {
+  case _: NoSuchTableException =>
+LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+}
+}
+val indexServerRows = if (isDistributedPruningEnabled) {
+  carbonTables.flatMap {
+mainTable =>
   try {
-val carbonTable = 
CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
-if (!carbonTable.isChildDataMap) {
-  carbonTables += carbonTable
-}
+makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), 
mainTable)
   } catch {
-case ex: NoSuchTableException =>
-  LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+case ex: UnsupportedOperationException => Seq()
   }
   }
+} else { Seq() }
 
-  // All tables of current database
-  var (dbDatamapSize, dbDictSize) = (0L, 0L)
-  val tableList = carbonTables.flatMap {
+val driverRows = if (cache != null) {
+  carbonTables.flatMap {
 carbonTable =>
   try {
-val tableResult = getTableCache(sparkSession, carbonTable)
-var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
-tableResult.drop(2).foreach {
-  row =>
-indexSize += row.getLong(1)
-datamapSize += row.getLong(2)
-}
-val dictSize = tableResult(1).getLong(1)
-
-   

[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-06-05 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r290998061
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 ##
 @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
 }
   }
 
-  override protected def opName: String = "SHOW CACHE"
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+if (tableIdentifier.isEmpty) {
+  /**
+   * Assemble result for database
+   */
+  getAllTablesCache(sparkSession)
+} else {
+  /**
+   * Assemble result for table
+   */
+  val carbonTable = 
CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+  Checker
+.validateTableExists(tableIdentifier.get.database, 
tableIdentifier.get.table, sparkSession)
+  val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
+  val driverRawResults = getTableCacheFromDriver(sparkSession, 
carbonTable, numberOfIndexFiles)
+  val indexRawResults = if 
(CarbonProperties.getInstance().isDistributedPruningEnabled
+  
(tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
+tableIdentifier.get.table)) {
+getTableCacheFromIndexServer(carbonTable, 
numberOfIndexFiles)(sparkSession)
+  } else { Seq() }
+  val result = driverRawResults.slice(0, 2) ++
+   driverRawResults.drop(2).map { row =>
+ Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+   }
+  val serverResults = indexRawResults.slice(0, 2) ++
+  indexRawResults.drop(2).map { row =>
+Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+  }
+  Seq(Row("DRIVER CACHE", "", "")) ++ result.map {
+row =>
+  Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
+  } ++ (serverResults match {
+case Nil => Seq()
+case list =>
+  Seq(Row("---", "---", "---"), Row("INDEX 
CACHE", "", "")) ++
+  list.map {
+  row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), 
row.get(2))
+}
+  })
+}
+  }
 
   def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
 val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
 val cache = CacheProvider.getInstance().getCarbonCache
-if (cache == null) {
-  Seq(
-Row("ALL", "ALL", 0L, 0L, 0L),
-Row(currentDatabase, "ALL", 0L, 0L, 0L))
-} else {
-  var carbonTables = mutable.ArrayBuffer[CarbonTable]()
-  sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
-tableIdent =>
+val isDistributedPruningEnabled = CarbonProperties.getInstance()
+  .isDistributedPruningEnabled("", "")
+if (cache == null && !isDistributedPruningEnabled) {
+  return makeEmptyCacheRows(currentDatabase)
+}
+var carbonTables = mutable.ArrayBuffer[CarbonTable]()
+sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
+  tableIdent =>
+try {
+  val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+  if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) {
+carbonTables += carbonTable
+  }
+} catch {
+  case _: NoSuchTableException =>
+LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+}
+}
+val indexServerRows = if (isDistributedPruningEnabled) {
+  carbonTables.flatMap {
+mainTable =>
   try {
-val carbonTable = 
CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
-if (!carbonTable.isChildDataMap) {
-  carbonTables += carbonTable
-}
+makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), 
mainTable)
   } catch {
-case ex: NoSuchTableException =>
-  LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+case ex: UnsupportedOperationException => Seq()
   }
   }
+} else { Seq() }
 
-  // All tables of current database
-  var (dbDatamapSize, dbDictSize) = (0L, 0L)
-  val tableList = carbonTables.flatMap {
+val driverRows = if (cache != null) {
+  carbonTables.flatMap {
 carbonTable =>
   try {
-val tableResult = getTableCache(sparkSession, carbonTable)
-var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
-tableResult.drop(2).foreach {
-  row =>
-indexSize += row.getLong(1)
-datamapSize += row.getLong(2)
-}
-val dictSize = tableResult(1).getLong(1)
-
-   

[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-06-05 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r290997945
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 ##
 @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: 
Option[TableIdentifier],
 }
   }
 
-  override protected def opName: String = "SHOW CACHE"
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+if (tableIdentifier.isEmpty) {
+  /**
+   * Assemble result for database
+   */
+  getAllTablesCache(sparkSession)
+} else {
+  /**
+   * Assemble result for table
+   */
+  val carbonTable = 
CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+  Checker
+.validateTableExists(tableIdentifier.get.database, 
tableIdentifier.get.table, sparkSession)
+  val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
+  val driverRawResults = getTableCacheFromDriver(sparkSession, 
carbonTable, numberOfIndexFiles)
+  val indexRawResults = if 
(CarbonProperties.getInstance().isDistributedPruningEnabled
+  
(tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
+tableIdentifier.get.table)) {
+getTableCacheFromIndexServer(carbonTable, 
numberOfIndexFiles)(sparkSession)
+  } else { Seq() }
+  val result = driverRawResults.slice(0, 2) ++
+   driverRawResults.drop(2).map { row =>
+ Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+   }
+  val serverResults = indexRawResults.slice(0, 2) ++
+  indexRawResults.drop(2).map { row =>
+Row(row.get(0), row.getLong(1) + row.getLong(2), 
row.get(3))
+  }
+  Seq(Row("DRIVER CACHE", "", "")) ++ result.map {
+row =>
+  Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
+  } ++ (serverResults match {
+case Nil => Seq()
+case list =>
+  Seq(Row("---", "---", "---"), Row("INDEX 
CACHE", "", "")) ++
+  list.map {
+  row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), 
row.get(2))
+}
+  })
+}
+  }
 
   def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
 val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
 val cache = CacheProvider.getInstance().getCarbonCache
-if (cache == null) {
-  Seq(
-Row("ALL", "ALL", 0L, 0L, 0L),
-Row(currentDatabase, "ALL", 0L, 0L, 0L))
-} else {
-  var carbonTables = mutable.ArrayBuffer[CarbonTable]()
-  sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
-tableIdent =>
+val isDistributedPruningEnabled = CarbonProperties.getInstance()
+  .isDistributedPruningEnabled("", "")
+if (cache == null && !isDistributedPruningEnabled) {
+  return makeEmptyCacheRows(currentDatabase)
+}
+var carbonTables = mutable.ArrayBuffer[CarbonTable]()
+sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
+  tableIdent =>
+try {
+  val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+  if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) {
+carbonTables += carbonTable
+  }
+} catch {
+  case _: NoSuchTableException =>
+LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+}
+}
+val indexServerRows = if (isDistributedPruningEnabled) {
+  carbonTables.flatMap {
+mainTable =>
   try {
-val carbonTable = 
CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
-if (!carbonTable.isChildDataMap) {
-  carbonTables += carbonTable
-}
+makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), 
mainTable)
   } catch {
-case ex: NoSuchTableException =>
-  LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+case ex: UnsupportedOperationException => Seq()
   }
   }
+} else { Seq() }
 
-  // All tables of current database
-  var (dbDatamapSize, dbDictSize) = (0L, 0L)
-  val tableList = carbonTables.flatMap {
+val driverRows = if (cache != null) {
+  carbonTables.flatMap {
 carbonTable =>
   try {
-val tableResult = getTableCache(sparkSession, carbonTable)
-var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
-tableResult.drop(2).foreach {
-  row =>
-indexSize += row.getLong(1)
-datamapSize += row.getLong(2)
-}
-val dictSize = tableResult(1).getLong(1)
-
-   

[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-06-05 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r290997482
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
 ##
 @@ -43,15 +50,23 @@ class DistributedShowCacheRDD(@transient private val ss: 
SparkSession, tableName
 
   override def internalCompute(split: Partition, context: TaskContext): 
Iterator[String] = {
 val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala
+val tableList = tableName.split(",").map(_.replace("-", "_"))
 val iterator = dataMaps.collect {
-  case (table, tableDataMaps) if table.isEmpty ||
- (tableName.nonEmpty && 
tableName.equalsIgnoreCase(table)) =>
+  case (table, tableDataMaps) if tableName.isEmpty || 
tableList.contains(table) =>
 val sizeAndIndexLengths = tableDataMaps.asScala
-  .map(_.getBlockletDetailsFetcher.getCacheSize)
-// return tableName_indexFileLength_indexCachesize for each executor.
-sizeAndIndexLengths.map {
-  x => s"$table:$x"
-}
+  .map { dataMap =>
+// if datamap name is null then it means that the TableDataMap is 
of BlockletDatamap
+// type.
 
 Review comment:
   This statement seems not valid, update the comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-05-31 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r289284518
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
 ##
 @@ -43,15 +44,25 @@ class DistributedShowCacheRDD(@transient private val ss: 
SparkSession, tableName
 
   override def internalCompute(split: Partition, context: TaskContext): 
Iterator[String] = {
 val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala
+val tableList = tableName.split(",")
 val iterator = dataMaps.collect {
   case (table, tableDataMaps) if table.isEmpty ||
- (tableName.nonEmpty && 
tableName.equalsIgnoreCase(table)) =>
+ (tableName.nonEmpty && 
tableList.contains(table)) =>
 val sizeAndIndexLengths = tableDataMaps.asScala
-  .map(_.getBlockletDetailsFetcher.getCacheSize)
-// return tableName_indexFileLength_indexCachesize for each executor.
-sizeAndIndexLengths.map {
-  x => s"$table:$x"
-}
+  .map { dataMap =>
+if (!dataMap.getDataMapSchema.getProviderName
+  .equals(DataMapClassProvider.BLOOMFILTER.getShortName)) {
 
 Review comment:
   I don't get why we are specifically checking for `BLOOMFILTER`. We are not 
supposed to check anything specific


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-05-31 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r289283548
 
 

 ##
 File path: 
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 ##
 @@ -559,4 +559,6 @@ class CGDataMapTestCase extends QueryTest with 
BeforeAndAfterAll {
 CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
   }
 
+
+
 
 Review comment:
   Don't change all files if not needed. I can see many files that are changed 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-05-31 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r289283548
 
 

 ##
 File path: 
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 ##
 @@ -559,4 +559,6 @@ class CGDataMapTestCase extends QueryTest with 
BeforeAndAfterAll {
 CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
   }
 
+
+
 
 Review comment:
   Don't change file if not needed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

2019-05-31 Thread GitBox
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] 
Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r289278684
 
 

 ##
 File path: 
datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
 ##
 @@ -112,4 +112,5 @@ public DataMapLevel getDataMapLevel() {
 return false;
 }
   }
+
 
 Review comment:
   Remove unnecessary change


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services