Repository: carbondata Updated Branches: refs/heads/branch-1.3 ba41ea4b2 -> a416fa691
[CARBONDATA-2341] Added Clean up of files for Pre-Aggregate table Added support For Clean up files for Pre-Aggregate tables when Cleanup on main table is fired This closes #2166 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a416fa69 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a416fa69 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a416fa69 Branch: refs/heads/branch-1.3 Commit: a416fa691f75d2dfc73b5d88117a264a5c148fc4 Parents: ba41ea4 Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com> Authored: Thu Apr 12 21:18:24 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Fri Apr 20 16:06:11 2018 +0530 ---------------------------------------------------------------------- .../management/CarbonCleanFilesCommand.scala | 43 ++++++++++++++++---- 1 file changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a416fa69/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index d2adc57..93b3d16 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -17,18 +17,21 @@ package org.apache.spark.sql.execution.command.management +import scala.collection.JavaConverters._ + import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.command.{Checker, DataCommand} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, DataCommand} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events._ import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.util.CommonUtil @@ -43,16 +46,38 @@ import org.apache.carbondata.spark.util.CommonUtil case class CarbonCleanFilesCommand( databaseNameOp: Option[String], tableName: Option[String], - forceTableClean: Boolean = false) - extends DataCommand { + forceTableClean: Boolean = false, + isInternalCleanCall: Boolean = false) + extends AtomicRunnableCommand { + + var carbonTable: CarbonTable = _ + var cleanFileCommands: List[CarbonCleanFilesCommand] = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession) + + if (carbonTable.hasAggregationDataMap) { + cleanFileCommands = carbonTable.getTableInfo.getDataMapSchemaList.asScala.map { + dataMapSchema => + val relationIdentifier = dataMapSchema.getRelationIdentifier + CarbonCleanFilesCommand( + Some(relationIdentifier.getDatabaseName), Some(relationIdentifier.getTableName), + isInternalCleanCall = true) + }.toList + cleanFileCommands.foreach(_.processMetadata(sparkSession)) + } else if (carbonTable.isChildDataMap && !isInternalCleanCall) { + throwMetadataException( + carbonTable.getDatabaseName, carbonTable.getTableName, + "Cannot clean files directly for aggregate table.") + } + Seq.empty + } override def processData(sparkSession: SparkSession): Seq[Row] = { - val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession) // if insert overwrite in progress, do not allow delete segment if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file") } - val operationContext = new OperationContext val cleanFilesPreEvent: CleanFilesPreEvent = CleanFilesPreEvent(carbonTable, @@ -68,6 +93,9 @@ case class CarbonCleanFilesCommand( } else { cleanGarbageDataInAllTables(sparkSession) } + if (cleanFileCommands != null) { + cleanFileCommands.foreach(_.processData(sparkSession)) + } val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession) OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext) @@ -89,11 +117,10 @@ case class CarbonCleanFilesCommand( private def cleanGarbageData(sparkSession: SparkSession, databaseNameOp: Option[String], tableName: String): Unit = { - val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( Seq.empty[Expression], sparkSession, - TableIdentifier(tableName, databaseNameOp)) + carbonTable) CarbonStore.cleanFiles( dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName = tableName,