Repository: carbondata
Updated Branches:
  refs/heads/master 8999a8aff -> 3beaa0e29


[CARBONDATA-2341] Added Clean up of files for Pre-Aggregate table

Added support For Clean up files for Pre-Aggregate tables when Cleanup on main 
table is fired

This closes #2166


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3beaa0e2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3beaa0e2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3beaa0e2

Branch: refs/heads/master
Commit: 3beaa0e298bf2da80056f19aa3d1b2ef34fa05d4
Parents: 8999a8a
Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com>
Authored: Thu Apr 12 21:18:24 2018 +0530
Committer: kunal642 <kunalkapoor...@gmail.com>
Committed: Fri Apr 20 15:55:06 2018 +0530

----------------------------------------------------------------------
 .../sdv/generated/PreAggregateTestCase.scala    | 14 +++++++
 .../management/CarbonCleanFilesCommand.scala    | 43 ++++++++++++++++----
 2 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3beaa0e2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala
index 3148cf3..d1b1310 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala
@@ -80,6 +80,7 @@ class PreAggregateTestCase extends QueryTest with 
BeforeAndAfterEach {
     checkAnswer(sql("select * from PreAggMain_PreAggMax"), expectedMax)
   }
 
+
   //test for incremental load
   test("PreAggregateTestCase_TC003", Include) {
     sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect
@@ -209,6 +210,19 @@ class PreAggregateTestCase extends QueryTest with 
BeforeAndAfterEach {
     checkAnswer(actual, expected)
   }
 
+  test("Test CleanUp of Pre_aggregate tables") {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("create datamap ag1 on table maintable using 'preaggregate' as select 
name,sum(price) from maintable group by name")
+    sql("insert into table maintable select 'abcd',22,3000")
+    sql("insert into table maintable select 'abcd',22,3000")
+    sql("insert into table maintable select 'abcd',22,3000")
+    sql("alter table maintable compact 'minor'")
+    sql("clean files for table maintable")
+    assert(sql("show segments for table 
maintable").collect().head.get(0).toString.contains("0.1"))
+  }
+
   override def afterEach: Unit = {
     sql("drop table if exists mainTable")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3beaa0e2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 2092028..a2f3727 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, 
DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.api.CarbonStore
@@ -28,8 +30,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, 
OperationContext, OperationListenerBus}
+import org.apache.carbondata.events._
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -43,16 +46,38 @@ import org.apache.carbondata.spark.util.CommonUtil
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
     tableName: Option[String],
-    forceTableClean: Boolean = false)
-  extends DataCommand {
+    forceTableClean: Boolean = false,
+    isInternalCleanCall: Boolean = false)
+  extends AtomicRunnableCommand {
+
+  var carbonTable: CarbonTable = _
+  var cleanFileCommands: List[CarbonCleanFilesCommand] = _
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName.get)(sparkSession)
+
+    if (carbonTable.hasAggregationDataMap) {
+      cleanFileCommands = 
carbonTable.getTableInfo.getDataMapSchemaList.asScala.map {
+        dataMapSchema =>
+          val relationIdentifier = dataMapSchema.getRelationIdentifier
+          CarbonCleanFilesCommand(
+            Some(relationIdentifier.getDatabaseName), 
Some(relationIdentifier.getTableName),
+            isInternalCleanCall = true)
+      }.toList
+      cleanFileCommands.foreach(_.processMetadata(sparkSession))
+    } else if (carbonTable.isChildDataMap && !isInternalCleanCall) {
+      throwMetadataException(
+        carbonTable.getDatabaseName, carbonTable.getTableName,
+        "Cannot clean files directly for aggregate table.")
+    }
+    Seq.empty
+  }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName.get)(sparkSession)
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"clean file")
     }
-
     val operationContext = new OperationContext
     val cleanFilesPreEvent: CleanFilesPreEvent =
       CleanFilesPreEvent(carbonTable,
@@ -68,6 +93,9 @@ case class CarbonCleanFilesCommand(
     } else {
       cleanGarbageDataInAllTables(sparkSession)
     }
+    if (cleanFileCommands != null) {
+      cleanFileCommands.foreach(_.processData(sparkSession))
+    }
     val cleanFilesPostEvent: CleanFilesPostEvent =
       CleanFilesPostEvent(carbonTable, sparkSession)
     OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, 
operationContext)
@@ -89,11 +117,10 @@ case class CarbonCleanFilesCommand(
 
   private def cleanGarbageData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
       Seq.empty[Expression],
       sparkSession,
-      TableIdentifier(tableName, databaseNameOp))
+      carbonTable)
     CarbonStore.cleanFiles(
       dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName = tableName,

Reply via email to