Repository: carbondata Updated Branches: refs/heads/master 8999a8aff -> 3beaa0e29
[CARBONDATA-2341] Added Clean up of files for Pre-Aggregate table Added support For Clean up files for Pre-Aggregate tables when Cleanup on main table is fired This closes #2166 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3beaa0e2 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3beaa0e2 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3beaa0e2 Branch: refs/heads/master Commit: 3beaa0e298bf2da80056f19aa3d1b2ef34fa05d4 Parents: 8999a8a Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com> Authored: Thu Apr 12 21:18:24 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Fri Apr 20 15:55:06 2018 +0530 ---------------------------------------------------------------------- .../sdv/generated/PreAggregateTestCase.scala | 14 +++++++ .../management/CarbonCleanFilesCommand.scala | 43 ++++++++++++++++---- 2 files changed, 49 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3beaa0e2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala index 3148cf3..d1b1310 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala @@ -80,6 +80,7 @@ class PreAggregateTestCase extends QueryTest with BeforeAndAfterEach { checkAnswer(sql("select * from PreAggMain_PreAggMax"), expectedMax) } + //test for incremental load test("PreAggregateTestCase_TC003", Include) { sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect @@ -209,6 +210,19 @@ class PreAggregateTestCase extends QueryTest with BeforeAndAfterEach { checkAnswer(actual, expected) } + test("Test CleanUp of Pre_aggregate tables") { + sql("drop table if exists maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table maintable select 'abc',21,2000") + sql("create datamap ag1 on table maintable using 'preaggregate' as select name,sum(price) from maintable group by name") + sql("insert into table maintable select 'abcd',22,3000") + sql("insert into table maintable select 'abcd',22,3000") + sql("insert into table maintable select 'abcd',22,3000") + sql("alter table maintable compact 'minor'") + sql("clean files for table maintable") + assert(sql("show segments for table maintable").collect().head.get(0).toString.contains("0.1")) + } + override def afterEach: Unit = { sql("drop table if exists mainTable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3beaa0e2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index 2092028..a2f3727 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.execution.command.management +import scala.collection.JavaConverters._ + import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.command.{Checker, DataCommand} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, DataCommand} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore @@ -28,8 +30,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events._ import org.apache.carbondata.spark.util.CommonUtil /** @@ -43,16 +46,38 @@ import org.apache.carbondata.spark.util.CommonUtil case class CarbonCleanFilesCommand( databaseNameOp: Option[String], tableName: Option[String], - forceTableClean: Boolean = false) - extends DataCommand { + forceTableClean: Boolean = false, + isInternalCleanCall: Boolean = false) + extends AtomicRunnableCommand { + + var carbonTable: CarbonTable = _ + var cleanFileCommands: List[CarbonCleanFilesCommand] = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession) + + if (carbonTable.hasAggregationDataMap) { + cleanFileCommands = carbonTable.getTableInfo.getDataMapSchemaList.asScala.map { + dataMapSchema => + val relationIdentifier = dataMapSchema.getRelationIdentifier + CarbonCleanFilesCommand( + Some(relationIdentifier.getDatabaseName), Some(relationIdentifier.getTableName), + isInternalCleanCall = true) + }.toList + cleanFileCommands.foreach(_.processMetadata(sparkSession)) + } else if (carbonTable.isChildDataMap && !isInternalCleanCall) { + throwMetadataException( + carbonTable.getDatabaseName, carbonTable.getTableName, + "Cannot clean files directly for aggregate table.") + } + Seq.empty + } override def processData(sparkSession: SparkSession): Seq[Row] = { - val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession) // if insert overwrite in progress, do not allow delete segment if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file") } - val operationContext = new OperationContext val cleanFilesPreEvent: CleanFilesPreEvent = CleanFilesPreEvent(carbonTable, @@ -68,6 +93,9 @@ case class CarbonCleanFilesCommand( } else { cleanGarbageDataInAllTables(sparkSession) } + if (cleanFileCommands != null) { + cleanFileCommands.foreach(_.processData(sparkSession)) + } val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession) OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext) @@ -89,11 +117,10 @@ case class CarbonCleanFilesCommand( private def cleanGarbageData(sparkSession: SparkSession, databaseNameOp: Option[String], tableName: String): Unit = { - val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( Seq.empty[Expression], sparkSession, - TableIdentifier(tableName, databaseNameOp)) + carbonTable) CarbonStore.cleanFiles( dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName = tableName,