[CARBONDATA-1669] Clean up code in CarbonDataRDDFactory This closes #1467
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0578ba0f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0578ba0f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0578ba0f Branch: refs/heads/pre-aggregate Commit: 0578ba0f2f9931a89f9759ea4be97975957280ae Parents: 11661eb Author: Jacky Li <jacky.li...@qq.com> Authored: Tue Nov 7 11:41:30 2017 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Tue Nov 7 19:47:17 2017 +0800 ---------------------------------------------------------------------- .../carbondata/core/locks/CarbonLockUtil.java | 19 +- .../org/apache/carbondata/api/CarbonStore.scala | 39 +- .../spark/rdd/DataManagementFunc.scala | 231 +-- .../carbondata/spark/rdd/UpdateDataLoad.scala | 3 +- .../spark/rdd/CarbonDataRDDFactory.scala | 1444 ++++++++---------- .../spark/sql/CarbonCatalystOperators.scala | 1 - .../AlterTableCompactionCommand.scala | 124 +- .../AlterTableDropCarbonPartitionCommand.scala | 133 +- .../AlterTableSplitCarbonPartitionCommand.scala | 156 +- .../restructure/AlterTableRevertTestCase.scala | 2 +- 10 files changed, 1044 insertions(+), 1108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java index 60a7564..c02a168 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java @@ -65,20 +65,27 @@ public class CarbonLockUtil { /** * Given a lock type this method will return a new lock object if not acquired by any other * operation - * - * @param identifier - * @param lockType - * @return */ - public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) { + public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType, + String errorMsg) { ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(identifier, lockType); LOGGER.info("Trying to acquire lock: " + carbonLock); if (carbonLock.lockWithRetries()) { LOGGER.info("Successfully acquired the lock " + carbonLock); } else { - throw new RuntimeException("Table is locked for updation. Please try after some time"); + LOGGER.error(errorMsg); + throw new RuntimeException(errorMsg); } return carbonLock; } + /** + * Get and lock with default error message + */ + public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) { + return getLockObject(identifier, + lockType, + "Acquire table lock failed after retry, please try after some time"); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 4a66d0f..e77f5c3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -28,8 +28,11 @@ import org.apache.spark.sql.types.TimestampType import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.DataManagementFunc @@ -79,16 +82,34 @@ object CarbonStore { dbName: String, tableName: String, storePath: String, - carbonTable: CarbonTable, forceTableClean: Boolean): Unit = { + carbonTable: CarbonTable, + forceTableClean: Boolean): Unit = { LOGGER.audit(s"The clean files request has been received for $dbName.$tableName") + var carbonCleanFilesLock: ICarbonLock = null + val identifier = new CarbonTableIdentifier(dbName, tableName, "") try { - DataManagementFunc.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean) - LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.") - } catch { - case ex: Exception => - sys.error(ex.getMessage) + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = + CarbonLockUtil.getLockObject(identifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + if (forceTableClean) { + val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName) + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(absIdent.getTablePath, + FileFactory.getFileType(absIdent.getTablePath))) + } else { + DataManagementFunc.deleteLoadsAndUpdateMetadata(dbName, tableName, storePath, + isForceDeletion = true, carbonTable) + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) + } + } finally { + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } } - Seq.empty + LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.") } // validates load ids @@ -163,7 +184,7 @@ object CarbonStore { val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new SegmentStatusManager( identifier).getValidAndInvalidSegments - return validAndInvalidSegments.getValidSegments.contains(segmentId) + validAndInvalidSegments.getValidSegments.contains(segmentId) } private def validateTimeFormat(timestamp: String): Long = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala index c2029e5..cbdb336 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala @@ -21,25 +21,20 @@ import java.util import java.util.concurrent._ import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, DropPartitionCallableModel, SplitPartitionCallableModel} +import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, LoadMetadataUtil} -import org.apache.carbondata.spark._ import org.apache.carbondata.spark.compaction.CompactionCallable -import org.apache.carbondata.spark.partition.{DropPartitionCallable, SplitPartitionCallable} import org.apache.carbondata.spark.util.CommonUtil /** @@ -49,105 +44,6 @@ object DataManagementFunc { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def deleteLoadByDate( - sqlContext: SQLContext, - schema: CarbonDataLoadSchema, - databaseName: String, - tableName: String, - storePath: String, - dateField: String, - dateFieldActualName: String, - dateValue: String) { - - val sc = sqlContext - // Delete the records based on data - val table = schema.getCarbonTable - val loadMetadataDetailsArray = - SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList - val resultMap = new CarbonDeleteLoadByDateRDD( - sc.sparkContext, - new DeletedLoadResultImpl(), - databaseName, - table.getDatabaseName, - dateField, - dateFieldActualName, - dateValue, - table.getFactTableName, - tableName, - storePath, - loadMetadataDetailsArray).collect.groupBy(_._1) - - var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]() - if (resultMap.nonEmpty) { - if (resultMap.size == 1) { - if (resultMap.contains("")) { - LOGGER.error("Delete by Date request is failed") - sys.error("Delete by Date request is failed, potential causes " + - "Empty store or Invalid column type, For more details please refer logs.") - } - } - val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => { - var statusList = resultMap.get(elem.getLoadName) - // check for the merged load folder. - if (statusList.isEmpty && null != elem.getMergedLoadName) { - statusList = resultMap.get(elem.getMergedLoadName) - } - - if (statusList.isDefined) { - elem.setModificationOrdeletionTimesStamp(elem.getTimeStamp(CarbonLoaderUtil - .readCurrentTime())) - // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist, - // use MARKED_FOR_UPDATE - if (statusList.get - .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) { - elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE) - } else { - elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE) - updatedLoadMetadataDetailsList += elem - } - elem - } else { - elem - } - } - - } - - // Save the load metadata - val carbonLock = CarbonLockFactory - .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK - ) - try { - if (carbonLock.lockWithRetries()) { - LOGGER.info("Successfully got the table metadata file lock") - if (updatedLoadMetadataDetailsList.nonEmpty) { - // TODO: Load Aggregate tables after retention. - } - - // write - CarbonLoaderUtil.writeLoadMetadata( - storePath, - databaseName, - table.getDatabaseName, - updatedloadMetadataDetails.asJava - ) - } - } finally { - if (carbonLock.unlock()) { - LOGGER.info("unlock the table metadata file successfully") - } else { - LOGGER.error("Unable to unlock the metadata lock") - } - } - } else { - LOGGER.error("Delete by Date request is failed") - LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName") - sys.error("Delete by Date request is failed, potential causes " + - "Empty store or Invalid column type, For more details please refer logs.") - } - } - def executeCompaction(carbonLoadModel: CarbonLoadModel, compactionModel: CompactionModel, executor: ExecutorService, @@ -226,8 +122,6 @@ object DataManagementFunc { /** * This will submit the loads to be merged into the executor. - * - * @param futureList */ private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], loadsToMerge: util.List[LoadMetadataDetails], @@ -235,14 +129,14 @@ object DataManagementFunc { sqlContext: SQLContext, compactionModel: CompactionModel, carbonLoadModel: CarbonLoadModel, - storeLocation: String): Unit = { - - loadsToMerge.asScala.foreach(seg => { + storeLocation: String + ): Unit = { + loadsToMerge.asScala.foreach { seg => LOGGER.info("loads identified for merge is " + seg.getLoadName) } - ) - val compactionCallableModel = CompactionCallableModel(carbonLoadModel, + val compactionCallableModel = CompactionCallableModel( + carbonLoadModel, storeLocation, compactionModel.carbonTable, loadsToMerge, @@ -254,80 +148,6 @@ object DataManagementFunc { futureList.add(future) } - def executePartitionSplit( sqlContext: SQLContext, - carbonLoadModel: CarbonLoadModel, - executor: ExecutorService, - segment: String, - partitionId: String, - oldPartitionIdList: List[Int]): Unit = { - val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]]( - CarbonCommonConstants.DEFAULT_COLLECTION_SIZE - ) - scanSegmentsForSplitPartition(futureList, executor, segment, partitionId, - sqlContext, carbonLoadModel, oldPartitionIdList) - try { - futureList.asScala.foreach(future => { - future.get - } - ) - } catch { - case e: Exception => - LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }") - throw e - } - } - - private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]], - executor: ExecutorService, - segmentId: String, - partitionId: String, - sqlContext: SQLContext, - carbonLoadModel: CarbonLoadModel, - oldPartitionIdList: List[Int]): Unit = { - - val splitModel = SplitPartitionCallableModel(carbonLoadModel, - segmentId, - partitionId, - oldPartitionIdList, - sqlContext) - - val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel)) - futureList.add(future) - } - - def executeDroppingPartition(sqlContext: SQLContext, - carbonLoadModel: CarbonLoadModel, - executor: ExecutorService, - segmentId: String, - partitionId: String, - dropWithData: Boolean, - oldPartitionIds: List[Int]): Unit = { - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val model = new DropPartitionCallableModel(carbonLoadModel, - segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext) - val future: Future[Void] = executor.submit(new DropPartitionCallable(model)) - try { - future.get - } catch { - case e: Exception => - LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }") - throw e - } - } - - def prepareCarbonLoadModel(table: CarbonTable, newCarbonLoadModel: CarbonLoadModel): Unit = { - newCarbonLoadModel.setTableName(table.getFactTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName) - newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName) - newCarbonLoadModel.setStorePath(table.getStorePath) - CommonUtil.readLoadMetadataDetails(newCarbonLoadModel) - val loadStartTime = CarbonUpdateUtil.readCurrentTime(); - newCarbonLoadModel.setFactTimeStamp(loadStartTime) - } - def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = { // Deleting the any partially loaded data if present. // in some case the segment folder which is present in store will not have entry in @@ -397,39 +217,4 @@ object DataManagementFunc { } } - def cleanFiles( - dbName: String, - tableName: String, - storePath: String, - carbonTable: CarbonTable, - forceTableClean: Boolean): Unit = { - val identifier = new CarbonTableIdentifier(dbName, tableName, "") - val carbonCleanFilesLock = - CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK) - try { - if (carbonCleanFilesLock.lockWithRetries()) { - LOGGER.info("Clean files lock has been successfully acquired.") - if (forceTableClean) { - val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(absIdent.getTablePath, - FileFactory.getFileType(absIdent.getTablePath))) - } else { - deleteLoadsAndUpdateMetadata(dbName, tableName, storePath, - isForceDeletion = true, carbonTable) - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) - } - } else { - val errorMsg = "Clean files request is failed for " + - s"$dbName.$tableName" + - ". Not able to acquire the clean files lock due to another clean files " + - "operation is running in the background." - LOGGER.audit(errorMsg) - LOGGER.error(errorMsg) - throw new Exception(errorMsg + " Please try after some time.") - } - } finally { - CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) - } - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala index 4d782c9..eb07240 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -35,7 +35,8 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil */ object UpdateDataLoad { - def DataLoadForUpdate(segId: String, + def DataLoadForUpdate( + segId: String, index: Int, iter: Iterator[Row], carbonLoadModel: CarbonLoadModel,