[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r292810798 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala ## @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], } } - override protected def opName: String = "SHOW CACHE" + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +if (tableIdentifier.isEmpty) { + /** + * Assemble result for database + */ + getAllTablesCache(sparkSession) +} else { + /** + * Assemble result for table + */ + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + Checker +.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) + val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled + (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), +tableIdentifier.get.table)) { +getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession) + } else { Seq() } + val result = driverRawResults.slice(0, 2) ++ + driverRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + val serverResults = indexRawResults.slice(0, 2) ++ + indexRawResults.drop(2).map { row => +Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + Seq(Row("DRIVER CACHE", "", "")) ++ result.map { +row => + Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } ++ (serverResults match { +case Nil => Seq() +case list => + Seq(Row("---", "---", "---"), Row("INDEX CACHE", "", "")) ++ + list.map { + row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) +} + }) +} + } def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = { val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache -if (cache == null) { - Seq( -Row("ALL", "ALL", 0L, 0L, 0L), -Row(currentDatabase, "ALL", 0L, 0L, 0L)) -} else { - var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { -tableIdent => +val isDistributedPruningEnabled = CarbonProperties.getInstance() + .isDistributedPruningEnabled("", "") +if (cache == null && !isDistributedPruningEnabled) { + return makeEmptyCacheRows(currentDatabase) +} +var carbonTables = mutable.ArrayBuffer[CarbonTable]() +sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + tableIdent => +try { + val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) { +carbonTables += carbonTable + } +} catch { + case _: NoSuchTableException => +LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +} +} +val indexServerRows = if (isDistributedPruningEnabled) { + carbonTables.flatMap { +mainTable => try { -val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) -if (!carbonTable.isChildDataMap) { - carbonTables += carbonTable -} +makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable) } catch { -case ex: NoSuchTableException => - LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +case ex: UnsupportedOperationException => Seq() } } +} else { Seq() } - // All tables of current database - var (dbDatamapSize, dbDictSize) = (0L, 0L) - val tableList = carbonTables.flatMap { +val driverRows = if (cache != null) { + carbonTables.flatMap { carbonTable => try { -val tableResult = getTableCache(sparkSession, carbonTable) -var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L) -tableResult.drop(2).foreach { - row => -indexSize += row.getLong(1) -datamapSize += row.getLong(2) -} -val dictSize = tableResult(1).getLong(1) - -
[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r290998061 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala ## @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], } } - override protected def opName: String = "SHOW CACHE" + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +if (tableIdentifier.isEmpty) { + /** + * Assemble result for database + */ + getAllTablesCache(sparkSession) +} else { + /** + * Assemble result for table + */ + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + Checker +.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) + val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled + (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), +tableIdentifier.get.table)) { +getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession) + } else { Seq() } + val result = driverRawResults.slice(0, 2) ++ + driverRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + val serverResults = indexRawResults.slice(0, 2) ++ + indexRawResults.drop(2).map { row => +Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + Seq(Row("DRIVER CACHE", "", "")) ++ result.map { +row => + Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } ++ (serverResults match { +case Nil => Seq() +case list => + Seq(Row("---", "---", "---"), Row("INDEX CACHE", "", "")) ++ + list.map { + row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) +} + }) +} + } def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = { val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache -if (cache == null) { - Seq( -Row("ALL", "ALL", 0L, 0L, 0L), -Row(currentDatabase, "ALL", 0L, 0L, 0L)) -} else { - var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { -tableIdent => +val isDistributedPruningEnabled = CarbonProperties.getInstance() + .isDistributedPruningEnabled("", "") +if (cache == null && !isDistributedPruningEnabled) { + return makeEmptyCacheRows(currentDatabase) +} +var carbonTables = mutable.ArrayBuffer[CarbonTable]() +sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + tableIdent => +try { + val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) { +carbonTables += carbonTable + } +} catch { + case _: NoSuchTableException => +LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +} +} +val indexServerRows = if (isDistributedPruningEnabled) { + carbonTables.flatMap { +mainTable => try { -val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) -if (!carbonTable.isChildDataMap) { - carbonTables += carbonTable -} +makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable) } catch { -case ex: NoSuchTableException => - LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +case ex: UnsupportedOperationException => Seq() } } +} else { Seq() } - // All tables of current database - var (dbDatamapSize, dbDictSize) = (0L, 0L) - val tableList = carbonTables.flatMap { +val driverRows = if (cache != null) { + carbonTables.flatMap { carbonTable => try { -val tableResult = getTableCache(sparkSession, carbonTable) -var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L) -tableResult.drop(2).foreach { - row => -indexSize += row.getLong(1) -datamapSize += row.getLong(2) -} -val dictSize = tableResult(1).getLong(1) - -
[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r290997945 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala ## @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], } } - override protected def opName: String = "SHOW CACHE" + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +if (tableIdentifier.isEmpty) { + /** + * Assemble result for database + */ + getAllTablesCache(sparkSession) +} else { + /** + * Assemble result for table + */ + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + Checker +.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) + val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled + (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), +tableIdentifier.get.table)) { +getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession) + } else { Seq() } + val result = driverRawResults.slice(0, 2) ++ + driverRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + val serverResults = indexRawResults.slice(0, 2) ++ + indexRawResults.drop(2).map { row => +Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + Seq(Row("DRIVER CACHE", "", "")) ++ result.map { +row => + Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } ++ (serverResults match { +case Nil => Seq() +case list => + Seq(Row("---", "---", "---"), Row("INDEX CACHE", "", "")) ++ + list.map { + row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) +} + }) +} + } def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = { val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache -if (cache == null) { - Seq( -Row("ALL", "ALL", 0L, 0L, 0L), -Row(currentDatabase, "ALL", 0L, 0L, 0L)) -} else { - var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { -tableIdent => +val isDistributedPruningEnabled = CarbonProperties.getInstance() + .isDistributedPruningEnabled("", "") +if (cache == null && !isDistributedPruningEnabled) { + return makeEmptyCacheRows(currentDatabase) +} +var carbonTables = mutable.ArrayBuffer[CarbonTable]() +sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + tableIdent => +try { + val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) { +carbonTables += carbonTable + } +} catch { + case _: NoSuchTableException => +LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +} +} +val indexServerRows = if (isDistributedPruningEnabled) { + carbonTables.flatMap { +mainTable => try { -val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) -if (!carbonTable.isChildDataMap) { - carbonTables += carbonTable -} +makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable) } catch { -case ex: NoSuchTableException => - LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +case ex: UnsupportedOperationException => Seq() } } +} else { Seq() } - // All tables of current database - var (dbDatamapSize, dbDictSize) = (0L, 0L) - val tableList = carbonTables.flatMap { +val driverRows = if (cache != null) { + carbonTables.flatMap { carbonTable => try { -val tableResult = getTableCache(sparkSession, carbonTable) -var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L) -tableResult.drop(2).foreach { - row => -indexSize += row.getLong(1) -datamapSize += row.getLong(2) -} -val dictSize = tableResult(1).getLong(1) - -
[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r290997482 ## File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala ## @@ -43,15 +50,23 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableName override def internalCompute(split: Partition, context: TaskContext): Iterator[String] = { val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala +val tableList = tableName.split(",").map(_.replace("-", "_")) val iterator = dataMaps.collect { - case (table, tableDataMaps) if table.isEmpty || - (tableName.nonEmpty && tableName.equalsIgnoreCase(table)) => + case (table, tableDataMaps) if tableName.isEmpty || tableList.contains(table) => val sizeAndIndexLengths = tableDataMaps.asScala - .map(_.getBlockletDetailsFetcher.getCacheSize) -// return tableName_indexFileLength_indexCachesize for each executor. -sizeAndIndexLengths.map { - x => s"$table:$x" -} + .map { dataMap => +// if datamap name is null then it means that the TableDataMap is of BlockletDatamap +// type. Review comment: This statement seems not valid, update the comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r289284518 ## File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala ## @@ -43,15 +44,25 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableName override def internalCompute(split: Partition, context: TaskContext): Iterator[String] = { val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala +val tableList = tableName.split(",") val iterator = dataMaps.collect { case (table, tableDataMaps) if table.isEmpty || - (tableName.nonEmpty && tableName.equalsIgnoreCase(table)) => + (tableName.nonEmpty && tableList.contains(table)) => val sizeAndIndexLengths = tableDataMaps.asScala - .map(_.getBlockletDetailsFetcher.getCacheSize) -// return tableName_indexFileLength_indexCachesize for each executor. -sizeAndIndexLengths.map { - x => s"$table:$x" -} + .map { dataMap => +if (!dataMap.getDataMapSchema.getProviderName + .equals(DataMapClassProvider.BLOOMFILTER.getShortName)) { Review comment: I don't get why we are specifically checking for `BLOOMFILTER`. We are not supposed to check anything specific This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r289283548 ## File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ## @@ -559,4 +559,6 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT) } + + Review comment: Don't change all files if not needed. I can see many files that are changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r289283548 ## File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ## @@ -559,4 +559,6 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT) } + + Review comment: Don't change file if not needed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
ravipesala commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r289278684 ## File path: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java ## @@ -112,4 +112,5 @@ public DataMapLevel getDataMapLevel() { return false; } } + Review comment: Remove unnecessary change This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services