[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r296178025 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala ## @@ -62,216 +67,366 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], } } - override protected def opName: String = "SHOW CACHE" + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +if (tableIdentifier.isEmpty) { + /** + * Assemble result for database + */ + getAllTablesCache(sparkSession) +} else { + /** + * Assemble result for table + */ + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + Checker +.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) + val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled + (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), +tableIdentifier.get.table)) { +getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession) + } else { Seq() } + val result = driverRawResults.slice(0, 2) ++ + driverRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + val serverResults = indexRawResults.slice(0, 2) ++ + indexRawResults.drop(2).map { row => +Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + Seq(Row("DRIVER CACHE", "", "")) ++ result.map { +row => + Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } ++ (serverResults match { +case Nil => Seq() +case list => + Seq(Row("---", "---", "---"), Row("INDEX CACHE", "", "")) ++ + list.map { + row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) +} + }) +} + } def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = { val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache -if (cache == null) { - Seq( -Row("ALL", "ALL", 0L, 0L, 0L), -Row(currentDatabase, "ALL", 0L, 0L, 0L)) -} else { - var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { -tableIdent => +val isDistributedPruningEnabled = CarbonProperties.getInstance() + .isDistributedPruningEnabled("", "") +if (cache == null && !isDistributedPruningEnabled) { + return makeEmptyCacheRows(currentDatabase) +} +var carbonTables = mutable.ArrayBuffer[CarbonTable]() +sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + tableIdent => +try { + val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) { +carbonTables += carbonTable + } +} catch { + case _: NoSuchTableException => +LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +} +} +val indexServerRows = if (isDistributedPruningEnabled) { + carbonTables.flatMap { +mainTable => try { -val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) -if (!carbonTable.isChildDataMap) { - carbonTables += carbonTable -} +makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable) } catch { -case ex: NoSuchTableException => - LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +case ex: UnsupportedOperationException => Seq() } } +} else { Seq() } - // All tables of current database - var (dbDatamapSize, dbDictSize) = (0L, 0L) - val tableList = carbonTables.flatMap { +val driverRows = if (cache != null) { + carbonTables.flatMap { carbonTable => try { -val tableResult = getTableCache(sparkSession, carbonTable) -var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L) -tableResult.drop(2).foreach { - row => -indexSize += row.getLong(1) -datamapSize += row.getLong(2) -} -val dictSize = tableResult(1).getLong(1) - -
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r296178046 ## File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala ## @@ -43,15 +49,21 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableName override def internalCompute(split: Partition, context: TaskContext): Iterator[String] = { val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala +val tableList = tableName.split(",").map(_.replace("-", "_")) Review comment: same as above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r296177375 ## File path: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ## @@ -453,4 +453,19 @@ public DataMapMeta getMeta() { public DataMapLevel getDataMapLevel() { return DataMapLevel.CG; } + + @Override public String getCacheSize() { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r296177481 ## File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala ## @@ -47,7 +47,7 @@ trait ServerInterface { /** * Get the cache size for the specified table. */ - def showCache(tableName: String) : Array[String] + def showCache(tableNames: String) : Array[String] Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r296177468 ## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ## @@ -565,7 +565,9 @@ public BlockMappingVO getBlockRowCount(Job job, CarbonTable table, allSegments.getInvalidSegments(), toBeCleanedSegments)); for (InputSplit extendedBlocklet : extendedBlocklets) { CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet; -blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blocklet.getFilePath(), +String filePath = blocklet.getFilePath(); +String blockName = filePath.substring(filePath.lastIndexOf("/") + 1); Review comment: added replace("\\", "/") to handle for windows This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r296177352 ## File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ## @@ -302,14 +302,18 @@ public synchronized void clear() { } } - @Override public String getCacheSize() throws IOException { + @Override public String getCacheSize() { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r292374118 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala ## @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], } } - override protected def opName: String = "SHOW CACHE" + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +if (tableIdentifier.isEmpty) { + /** + * Assemble result for database + */ + getAllTablesCache(sparkSession) +} else { + /** + * Assemble result for table + */ + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + Checker +.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) + val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled + (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), +tableIdentifier.get.table)) { +getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession) + } else { Seq() } + val result = driverRawResults.slice(0, 2) ++ + driverRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + val serverResults = indexRawResults.slice(0, 2) ++ + indexRawResults.drop(2).map { row => +Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + Seq(Row("DRIVER CACHE", "", "")) ++ result.map { +row => + Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } ++ (serverResults match { +case Nil => Seq() +case list => + Seq(Row("---", "---", "---"), Row("INDEX CACHE", "", "")) ++ + list.map { + row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) +} + }) +} + } def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = { val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache -if (cache == null) { - Seq( -Row("ALL", "ALL", 0L, 0L, 0L), -Row(currentDatabase, "ALL", 0L, 0L, 0L)) -} else { - var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { -tableIdent => +val isDistributedPruningEnabled = CarbonProperties.getInstance() + .isDistributedPruningEnabled("", "") +if (cache == null && !isDistributedPruningEnabled) { + return makeEmptyCacheRows(currentDatabase) +} +var carbonTables = mutable.ArrayBuffer[CarbonTable]() +sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + tableIdent => +try { + val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) { +carbonTables += carbonTable + } +} catch { + case _: NoSuchTableException => +LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +} +} +val indexServerRows = if (isDistributedPruningEnabled) { + carbonTables.flatMap { +mainTable => try { -val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) -if (!carbonTable.isChildDataMap) { - carbonTables += carbonTable -} +makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable) } catch { -case ex: NoSuchTableException => - LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +case ex: UnsupportedOperationException => Seq() } } +} else { Seq() } - // All tables of current database - var (dbDatamapSize, dbDictSize) = (0L, 0L) - val tableList = carbonTables.flatMap { +val driverRows = if (cache != null) { + carbonTables.flatMap { carbonTable => try { -val tableResult = getTableCache(sparkSession, carbonTable) -var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L) -tableResult.drop(2).foreach { - row => -indexSize += row.getLong(1) -datamapSize += row.getLong(2) -} -val dictSize = tableResult(1).getLong(1) - -
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r292373470 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala ## @@ -62,216 +68,397 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier], } } - override protected def opName: String = "SHOW CACHE" + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +if (tableIdentifier.isEmpty) { + /** + * Assemble result for database + */ + getAllTablesCache(sparkSession) +} else { + /** + * Assemble result for table + */ + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession) + Checker +.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession) + val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size + val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles) + val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled + (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase), +tableIdentifier.get.table)) { +getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession) + } else { Seq() } + val result = driverRawResults.slice(0, 2) ++ + driverRawResults.drop(2).map { row => + Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + val serverResults = indexRawResults.slice(0, 2) ++ + indexRawResults.drop(2).map { row => +Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3)) + } + Seq(Row("DRIVER CACHE", "", "")) ++ result.map { +row => + Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) + } ++ (serverResults match { +case Nil => Seq() +case list => + Seq(Row("---", "---", "---"), Row("INDEX CACHE", "", "")) ++ + list.map { + row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2)) +} + }) +} + } def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = { val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache -if (cache == null) { - Seq( -Row("ALL", "ALL", 0L, 0L, 0L), -Row(currentDatabase, "ALL", 0L, 0L, 0L)) -} else { - var carbonTables = mutable.ArrayBuffer[CarbonTable]() - sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { -tableIdent => +val isDistributedPruningEnabled = CarbonProperties.getInstance() + .isDistributedPruningEnabled("", "") +if (cache == null && !isDistributedPruningEnabled) { + return makeEmptyCacheRows(currentDatabase) +} +var carbonTables = mutable.ArrayBuffer[CarbonTable]() +sparkSession.sessionState.catalog.listTables(currentDatabase).foreach { + tableIdent => +try { + val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) + if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) { +carbonTables += carbonTable + } +} catch { + case _: NoSuchTableException => +LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +} +} +val indexServerRows = if (isDistributedPruningEnabled) { + carbonTables.flatMap { +mainTable => try { -val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) -if (!carbonTable.isChildDataMap) { - carbonTables += carbonTable -} +makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable) } catch { -case ex: NoSuchTableException => - LOGGER.debug("Ignoring non-carbon table " + tableIdent.table) +case ex: UnsupportedOperationException => Seq() } } +} else { Seq() } - // All tables of current database - var (dbDatamapSize, dbDictSize) = (0L, 0L) - val tableList = carbonTables.flatMap { +val driverRows = if (cache != null) { + carbonTables.flatMap { carbonTable => try { -val tableResult = getTableCache(sparkSession, carbonTable) -var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L) -tableResult.drop(2).foreach { - row => -indexSize += row.getLong(1) -datamapSize += row.getLong(2) -} -val dictSize = tableResult(1).getLong(1) - -
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r289334935 ## File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ## @@ -559,4 +559,6 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT) } + + Review comment: removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r289334957 ## File path: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java ## @@ -112,4 +112,5 @@ public DataMapLevel getDataMapLevel() { return false; } } + Review comment: reverted This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r289334912 ## File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala ## @@ -43,15 +44,25 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession, tableName override def internalCompute(split: Partition, context: TaskContext): Iterator[String] = { val dataMaps = DataMapStoreManager.getInstance().getAllDataMaps.asScala +val tableList = tableName.split(",") val iterator = dataMaps.collect { case (table, tableDataMaps) if table.isEmpty || - (tableName.nonEmpty && tableName.equalsIgnoreCase(table)) => + (tableName.nonEmpty && tableList.contains(table)) => val sizeAndIndexLengths = tableDataMaps.asScala - .map(_.getBlockletDetailsFetcher.getCacheSize) -// return tableName_indexFileLength_indexCachesize for each executor. -sizeAndIndexLengths.map { - x => s"$table:$x" -} + .map { dataMap => +if (!dataMap.getDataMapSchema.getProviderName + .equals(DataMapClassProvider.BLOOMFILTER.getShortName)) { Review comment: changed the condition to check if dataMapName is null (for BlockletDataMap) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV URL: https://github.com/apache/carbondata/pull/3245#discussion_r288641389 ## File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ## @@ -302,13 +302,14 @@ public synchronized void clear() { } } - @Override public String getCacheSize() throws IOException { + @Override public String getCacheSize() { long sum = 0L; int numOfIndexFiles = 0; for (Map.Entry> entry : segmentMap.entrySet()) { for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()) { -sum += cache.get(new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, -getCarbonTable())).getMemorySize(); +sum += cache.getIfPresent( Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services