Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 ba41ea4b2 -> a416fa691


[CARBONDATA-2341] Added Clean up of files for Pre-Aggregate table

Added support For Clean up files for Pre-Aggregate tables when Cleanup on main 
table is fired

This closes #2166


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a416fa69
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a416fa69
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a416fa69

Branch: refs/heads/branch-1.3
Commit: a416fa691f75d2dfc73b5d88117a264a5c148fc4
Parents: ba41ea4
Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com>
Authored: Thu Apr 12 21:18:24 2018 +0530
Committer: kunal642 <kunalkapoor...@gmail.com>
Committed: Fri Apr 20 16:06:11 2018 +0530

----------------------------------------------------------------------
 .../management/CarbonCleanFilesCommand.scala    | 43 ++++++++++++++++----
 1 file changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a416fa69/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index d2adc57..93b3d16 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,18 +17,21 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, 
DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, 
OperationContext, OperationListenerBus}
+import org.apache.carbondata.events._
 import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -43,16 +46,38 @@ import org.apache.carbondata.spark.util.CommonUtil
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
     tableName: Option[String],
-    forceTableClean: Boolean = false)
-  extends DataCommand {
+    forceTableClean: Boolean = false,
+    isInternalCleanCall: Boolean = false)
+  extends AtomicRunnableCommand {
+
+  var carbonTable: CarbonTable = _
+  var cleanFileCommands: List[CarbonCleanFilesCommand] = _
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName.get)(sparkSession)
+
+    if (carbonTable.hasAggregationDataMap) {
+      cleanFileCommands = 
carbonTable.getTableInfo.getDataMapSchemaList.asScala.map {
+        dataMapSchema =>
+          val relationIdentifier = dataMapSchema.getRelationIdentifier
+          CarbonCleanFilesCommand(
+            Some(relationIdentifier.getDatabaseName), 
Some(relationIdentifier.getTableName),
+            isInternalCleanCall = true)
+      }.toList
+      cleanFileCommands.foreach(_.processMetadata(sparkSession))
+    } else if (carbonTable.isChildDataMap && !isInternalCleanCall) {
+      throwMetadataException(
+        carbonTable.getDatabaseName, carbonTable.getTableName,
+        "Cannot clean files directly for aggregate table.")
+    }
+    Seq.empty
+  }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName.get)(sparkSession)
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"clean file")
     }
-
     val operationContext = new OperationContext
     val cleanFilesPreEvent: CleanFilesPreEvent =
       CleanFilesPreEvent(carbonTable,
@@ -68,6 +93,9 @@ case class CarbonCleanFilesCommand(
     } else {
       cleanGarbageDataInAllTables(sparkSession)
     }
+    if (cleanFileCommands != null) {
+      cleanFileCommands.foreach(_.processData(sparkSession))
+    }
     val cleanFilesPostEvent: CleanFilesPostEvent =
       CleanFilesPostEvent(carbonTable, sparkSession)
     OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, 
operationContext)
@@ -89,11 +117,10 @@ case class CarbonCleanFilesCommand(
 
   private def cleanGarbageData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
       Seq.empty[Expression],
       sparkSession,
-      TableIdentifier(tableName, databaseNameOp))
+      carbonTable)
     CarbonStore.cleanFiles(
       dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName = tableName,

Reply via email to