http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index bc84e04..20d3032 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -29,8 +29,10 @@ import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.CarbonTableIdentifier -import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper +import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema._ @@ -109,15 +111,14 @@ case class CarbonMergerMapping( var mergedLoadName: String, databaseName: String, factTableName: String, - validSegments: Array[String], + validSegments: Array[Segment], tableId: String, campactionType: CompactionType, // maxSegmentColCardinality is Cardinality of last segment of compaction var maxSegmentColCardinality: Array[Int], // maxSegmentColumnSchemaList is list of column schema of last segment of compaction var maxSegmentColumnSchemaList: List[ColumnSchema], - currentPartitions: Seq[String], - @transient partitionMapper: PartitionMapper) + currentPartitions: Option[Seq[PartitionSpec]]) case class NodeInfo(TaskId: String, noOfBlocks: Int) @@ -133,20 +134,20 @@ case class UpdateTableModel( isUpdate: Boolean, updatedTimeStamp: Long, var executorErrors: ExecutionErrors, - deletedSegments: Seq[String]) + deletedSegments: Seq[Segment]) case class CompactionModel(compactionSize: Long, compactionType: CompactionType, carbonTable: CarbonTable, isDDLTrigger: Boolean, - currentPartitions: Seq[String]) + currentPartitions: Option[Seq[PartitionSpec]]) case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel, carbonTable: CarbonTable, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext, compactionType: CompactionType, - currentPartitions: Seq[String]) + currentPartitions: Option[Seq[PartitionSpec]]) case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel, segmentId: String, @@ -161,7 +162,7 @@ case class SplitPartitionCallableModel(carbonLoadModel: CarbonLoadModel, sqlContext: SQLContext) case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel, - segmentId: String, + segmentId: Segment, partitionId: String, oldPartitionIds: List[Int], dropWithData: Boolean,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 8ed7623..1695a13 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -38,8 +38,8 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD} import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, SQLContext} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel} -import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException @@ -47,6 +47,7 @@ import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} @@ -207,7 +208,9 @@ object CarbonDataRDDFactory { compactionType, table, compactionModel.isDDLTrigger, - CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, table)) + CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, + TableIdentifier(table.getTableName, + Some(table.getDatabaseName)))) // proceed for compaction try { CompactionFactory.getCompactor( @@ -395,26 +398,9 @@ object CarbonDataRDDFactory { } catch { case ex: Throwable => loadStatus = SegmentStatus.LOAD_FAILURE - ex match { - case sparkException: SparkException => - if (sparkException.getCause.isInstanceOf[DataLoadingException] || - sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) { - executorMessage = sparkException.getCause.getMessage - errorMessage = errorMessage + ": " + executorMessage - } else if (sparkException.getCause.isInstanceOf[TextParsingException]) { - executorMessage = CarbonDataProcessorUtil - .trimErrorMessage(sparkException.getCause.getMessage) - errorMessage = errorMessage + " : " + executorMessage - } - case aex: AnalysisException => - LOGGER.error(aex.getMessage()) - throw aex - case _ => - if (ex.getCause != null) { - executorMessage = ex.getCause.getMessage - errorMessage = errorMessage + ": " + executorMessage - } - } + val (extrMsgLocal, errorMsgLocal) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER) + executorMessage = extrMsgLocal + errorMessage = errorMsgLocal LOGGER.info(errorMessage) LOGGER.error(ex) } finally { @@ -423,14 +409,7 @@ object CarbonDataRDDFactory { // handle the status file updation for the update cmd. if (updateModel.isDefined) { if (loadStatus == SegmentStatus.LOAD_FAILURE) { - if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - if (null != executorMessage && !executorMessage.isEmpty) { - updateModel.get.executorErrors.errorMsg = executorMessage - } else { - updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed." - } - } + CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage) return } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS && updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS && @@ -441,12 +420,12 @@ object CarbonDataRDDFactory { // success case. // write the dictionary file in case of single_pass true writeDictionary(carbonLoadModel, result, false) - val segmentDetails = new util.HashSet[String]() + val segmentDetails = new util.HashSet[Segment]() var resultSize = 0 res.foreach { resultOfSeg => resultSize = resultSize + resultOfSeg.size resultOfSeg.foreach { resultOfBlock => - segmentDetails.add(resultOfBlock._2._1.getLoadName) + segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null)) } } @@ -462,7 +441,7 @@ object CarbonDataRDDFactory { carbonTable, updateModel.get.updatedTimeStamp + "", true, - new util.ArrayList[String](0))) { + new util.ArrayList[Segment](0))) { LOGGER.audit("Data update is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } else { @@ -744,7 +723,9 @@ object CarbonDataRDDFactory { CompactionType.MINOR, carbonTable, isCompactionTriggerByDDl, - CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable)) + CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, + TableIdentifier(carbonTable.getTableName, + Some(carbonTable.getDatabaseName)))) var storeLocation = "" val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf) if (null != configuredStore && configuredStore.nonEmpty) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index e7bdff8..07acaa5 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -18,26 +18,23 @@ package org.apache.carbondata.spark.rdd import java.util -import java.util.{List, Map} +import java.util.List import java.util.concurrent.ExecutorService import scala.collection.JavaConverters._ -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} -import org.apache.carbondata.core.metadata.PartitionMapFileStore -import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper -import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl -import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER import org.apache.carbondata.spark.util.CommonUtil /** @@ -136,39 +133,19 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, var finalMergeStatus = false val databaseName: String = carbonLoadModel.getDatabaseName val factTableName = carbonLoadModel.getTableName - val validSegments: Array[String] = CarbonDataMergerUtil - .getValidSegments(loadsToMerge).split(',') - val partitionMapper = if (carbonTable.isHivePartitionTable) { - var partitionMap: util.Map[String, util.List[String]] = null - validSegments.foreach { segmentId => - val localMapper = new PartitionMapFileStore() - localMapper.readAllPartitionsOfSegment( - CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath, segmentId)) - if (partitionMap == null) { - partitionMap = localMapper.getPartitionMap - } else { - partitionMap.putAll(localMapper.getPartitionMap) - } - } - val mapper = new PartitionMapper() - mapper.setPartitionMap(partitionMap) - mapper - } else { - null - } + val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge) val carbonMergerMapping = CarbonMergerMapping( tablePath, carbonTable.getMetaDataFilepath, mergedLoadName, databaseName, factTableName, - validSegments, + validSegments.asScala.toArray, carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, compactionType, maxSegmentColCardinality = null, maxSegmentColumnSchemaList = null, - currentPartitions = partitions, - partitionMapper) + currentPartitions = partitions) carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation) carbonLoadModel.setLoadMetadataDetails( SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) @@ -221,11 +198,28 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, if (finalMergeStatus) { val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) - new PartitionMapFileStore().mergePartitionMapFiles( - CarbonTablePath.getSegmentPath(tablePath, mergedLoadNumber), - carbonLoadModel.getFactTimeStamp + "") + var segmentFileName: String = null + if (carbonTable.isHivePartitionTable) { + val readPath = + CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp" + // Merge all partition files into a single file. + segmentFileName = + mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp + val segmentFile = SegmentFileStore + .mergeSegmentFiles(readPath, + segmentFileName, + CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath)) + if (segmentFile != null) { + SegmentFileStore + .moveFromTempFolder(segmentFile, + carbonLoadModel.getFactTimeStamp + ".tmp", + carbonLoadModel.getTablePath) + } + segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT + } // trigger event for compaction - val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent = + val alterTableCompactionPreStatusUpdateEvent = AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession, carbonTable, carbonMergerMapping, @@ -242,9 +236,13 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath, carbonLoadModel)) || - CarbonDataMergerUtil - .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath, - mergedLoadNumber, carbonLoadModel, compactionType) + CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus( + loadsToMerge, + carbonTable.getMetaDataFilepath, + mergedLoadNumber, + carbonLoadModel, + compactionType, + segmentFileName) val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable, carbonMergerMapping, carbonLoadModel, http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index 833c6fe..c286c50 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -55,7 +55,9 @@ case class CarbonCountStar( CarbonFilters.getPartitions( Seq.empty, sparkSession, - TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))).asJava), + TableIdentifier( + carbonTable.getTableName, + Some(carbonTable.getDatabaseName))).map(_.asJava).orNull), absoluteTableIdentifier) val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]) val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 0978fab..46905b8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.Expression @@ -67,7 +68,7 @@ case class CarbonDatasourceHadoopRelation( def buildScan(requiredColumns: Array[String], filters: Array[Filter], - partitions: Seq[String]): RDD[InternalRow] = { + partitions: Seq[PartitionSpec]): RDD[InternalRow] = { val filterExpression: Option[Expression] = filters.flatMap { filter => CarbonFilters.createCarbonFilter(schema, filter) }.reduceOption(new AndExpression(_, _)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 667d550..7e3b699 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -206,7 +206,9 @@ case class CarbonAlterTableCompactionCommand( compactionType, carbonTable, isCompactionTriggerByDDl, - CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable) + CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, + TableIdentifier(carbonTable.getTableName, + Some(carbonTable.getDatabaseName))) ) val isConcurrentCompactionAllowed = CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index 4f90fb5..d2adc57 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.spark.exception.ConcurrentOperationException @@ -89,14 +90,10 @@ case class CarbonCleanFilesCommand( private def cleanGarbageData(sparkSession: SparkSession, databaseNameOp: Option[String], tableName: String): Unit = { val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - val partitions: Option[Seq[String]] = if (carbonTable.isHivePartitionTable) { - Some(CarbonFilters.getPartitions( - Seq.empty[Expression], - sparkSession, - TableIdentifier(tableName, databaseNameOp))) - } else { - None - } + val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( + Seq.empty[Expression], + sparkSession, + TableIdentifier(tableName, databaseNameOp)) CarbonStore.cleanFiles( dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName = tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 9bdaddb..7800d3e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFile import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.types._ +import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} @@ -51,11 +52,13 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider -import org.apache.carbondata.core.metadata.PartitionMapFileStore +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} @@ -72,7 +75,7 @@ import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServicePr import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} case class CarbonLoadDataCommand( @@ -97,6 +100,8 @@ case class CarbonLoadDataCommand( var sizeInBytes: Long = _ + var currPartitions: util.List[PartitionSpec] = _ + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) @@ -123,6 +128,12 @@ case class CarbonLoadDataCommand( case l: LogicalRelation => l }.head sizeInBytes = logicalPartitionRelation.relation.sizeInBytes + currPartitions = CarbonFilters.getCurrentPartitions( + sparkSession, + TableIdentifier(tableName, databaseNameOp)) match { + case Some(parts) => new util.ArrayList(parts.toList.asJava) + case _ => null + } } operationContext.setProperty("isOverwrite", isOverwriteTable) if(CarbonUtil.hasAggregationDataMap(table)) { @@ -182,8 +193,9 @@ case class CarbonLoadDataCommand( options, optionsFinal, carbonLoadModel, - hadoopConf - ) + hadoopConf, + partition, + dataFrame.isDefined) // Delete stale segment folders that are not in table status but are physically present in // the Fact folder LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") @@ -215,7 +227,10 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") // Clean up the old invalid segment data before creating a new entry for new load. - DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) + DataLoadingUtil.deleteLoadsAndUpdateMetadata( + isForceDeletion = false, + table, + currPartitions) // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( @@ -259,7 +274,8 @@ case class CarbonLoadDataCommand( columnar, partitionStatus, hadoopConf, - operationContext) + operationContext, + LOGGER) } else { loadData( sparkSession, @@ -267,7 +283,8 @@ case class CarbonLoadDataCommand( columnar, partitionStatus, hadoopConf, - operationContext) + operationContext, + LOGGER) } val loadTablePostExecutionEvent: LoadTablePostExecutionEvent = new LoadTablePostExecutionEvent( @@ -331,7 +348,9 @@ case class CarbonLoadDataCommand( columnar: Boolean, partitionStatus: SegmentStatus, hadoopConf: Configuration, - operationContext: OperationContext): Unit = { + operationContext: OperationContext, + LOGGER: LogService): Seq[Row] = { + var rows = Seq.empty[Row] val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier @@ -418,12 +437,13 @@ case class CarbonLoadDataCommand( if (carbonTable.isHivePartitionTable) { try { - loadDataWithPartition( + rows = loadDataWithPartition( sparkSession, carbonLoadModel, hadoopConf, loadDataFrame, - operationContext) + operationContext, + LOGGER) } finally { server match { case Some(dictServer) => @@ -450,6 +470,7 @@ case class CarbonLoadDataCommand( updateModel, operationContext) } + rows } private def loadData( @@ -458,7 +479,9 @@ case class CarbonLoadDataCommand( columnar: Boolean, partitionStatus: SegmentStatus, hadoopConf: Configuration, - operationContext: OperationContext): Unit = { + operationContext: OperationContext, + LOGGER: LogService): Seq[Row] = { + var rows = Seq.empty[Row] val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID() // getting all fields except tupleId field as it is not required in the value @@ -478,12 +501,12 @@ case class CarbonLoadDataCommand( dictionaryDataFrame) } if (table.isHivePartitionTable) { - loadDataWithPartition( + rows = loadDataWithPartition( sparkSession, carbonLoadModel, hadoopConf, loadDataFrame, - operationContext) + operationContext, LOGGER) } else { CarbonDataRDDFactory.loadCarbonData( sparkSession.sqlContext, @@ -497,6 +520,7 @@ case class CarbonLoadDataCommand( updateModel, operationContext) } + rows } /** @@ -504,24 +528,16 @@ case class CarbonLoadDataCommand( * into partitoned data. The table relation would be converted to HadoopFSRelation to let spark * handling the partitioning. */ - private def loadDataWithPartition(sparkSession: SparkSession, + private def loadDataWithPartition( + sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel, hadoopConf: Configuration, dataFrame: Option[DataFrame], - operationContext: OperationContext): Unit = { + operationContext: OperationContext, + LOGGER: LogService): Seq[Row] = { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get - val currentPartitions = - CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier) - // Clean up the alreday dropped partitioned data - new PartitionMapFileStore().cleanSegments(table, currentPartitions.asJava, false) - // Converts the data to carbon understandable format. The timestamp/date format data needs to - // converted to hive standard fomat to let spark understand the data to partition. - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val badRecordAction = - carbonLoadModel.getBadRecordsAction.split(",")(1) var timeStampformatString = carbonLoadModel.getTimestampformat if (timeStampformatString.isEmpty) { timeStampformatString = carbonLoadModel.getDefaultTimestampFormat @@ -532,127 +548,114 @@ case class CarbonLoadDataCommand( dateFormatString = carbonLoadModel.getDefaultDateFormat } val dateFormat = new SimpleDateFormat(dateFormatString) - CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) - CarbonSession.threadSet( - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, - timeStampformatString) - CarbonSession.threadSet( - CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, - serializationNullFormat) - CarbonSession.threadSet( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - badRecordAction) - val isEmptyBadRecord = carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1) - CarbonSession.threadSet( - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - isEmptyBadRecord) + // Clean up the alreday dropped partitioned data + SegmentFileStore.cleanSegments(table, null, false) CarbonSession.threadSet("partition.operationcontext", operationContext) // input data from csv files. Convert to logical plan val allCols = new ArrayBuffer[String]() allCols ++= table.getAllDimensions.asScala.map(_.getColName) allCols ++= table.getAllMeasures.asScala.map(_.getColName) var attributes = - StructType(allCols.map(StructField(_, StringType))).toAttributes + StructType( + allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map( + StructField(_, StringType))).toAttributes var partitionsLen = 0 val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) - def transformQuery(rdd: RDD[Row], isDataFrame: Boolean) = { - val updatedRdd = convertData(rdd, sparkSession, carbonLoadModel, isDataFrame) - val catalogAttributes = catalogTable.schema.toAttributes - attributes = attributes.map(a => { - catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get - }) - attributes = attributes.map { attr => - val column = table.getColumnByName(table.getTableName, attr.name) - if (column.hasEncoding(Encoding.DICTIONARY)) { - AttributeReference( - attr.name, - IntegerType, - attr.nullable, - attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) - } else if (attr.dataType == TimestampType || attr.dataType == DateType) { - AttributeReference( - attr.name, - LongType, - attr.nullable, - attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) - } else { - attr - } - } - // Only select the required columns - val output = if (partition.nonEmpty) { - val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) } - catalogTable.schema.map { attr => - attributes.find(_.name.equalsIgnoreCase(attr.name)).get - }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty) - } else { - catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) - } - partitionsLen = rdd.partitions.length - val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession)) - if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) { - val sortColumns = table.getSortColumns(table.getTableName) - Sort(output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)), - true, - child) - } else { - child - } + val partitionValues = if (partition.nonEmpty) { + partition.filter(_._2.nonEmpty).map{ case(col, value) => + val field = catalogTable.schema.find(_.name.equalsIgnoreCase(col)).get + CarbonScalaUtil.convertToDateAndTimeFormats( + value.get, + field.dataType, + timeStampFormat, + dateFormat) + }.toArray + } else { + Array[String]() } - + var persistedRDD: Option[RDD[InternalRow]] = None try { val query: LogicalPlan = if (dataFrame.isDefined) { - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val dfAttributes = - StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val partitionValues = if (partition.nonEmpty) { - partition.values.filter(_.nonEmpty).map(_.get).toArray + val (rdd, dfAttributes) = if (updateModel.isDefined) { + // Get the updated query plan in case of update scenario + val updatedFrame = Dataset.ofRows( + sparkSession, + getLogicalQueryForUpdate( + sparkSession, + catalogTable, + dataFrame.get, + carbonLoadModel)) + (updatedFrame.rdd, updatedFrame.schema) } else { - Array[String]() + if (partition.nonEmpty) { + val headers = carbonLoadModel.getCsvHeaderColumns.dropRight(partition.size) + val updatedHeader = headers ++ partition.keys.map(_.toLowerCase) + carbonLoadModel.setCsvHeader(updatedHeader.mkString(",")) + carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(",")) + } + (dataFrame.get.rdd, dataFrame.get.schema) + } + + val expectedColumns = { + val staticPartCols = partition.filter(_._2.isDefined).keySet + attributes.filterNot(a => staticPartCols.contains(a.name)) + } + if (expectedColumns.length != dfAttributes.length) { + throw new AnalysisException( + s"Cannot insert into table $tableName because the number of columns are different: " + + s"need ${expectedColumns.length} columns, " + + s"but query has ${dfAttributes.length} columns.") + } + val nonPartitionBounds = expectedColumns.zipWithIndex.map(_._2).toArray + val partitionBounds = new Array[Int](partitionValues.length) + if (partition.nonEmpty) { + val nonPartitionSchemaLen = attributes.length - partition.size + var i = nonPartitionSchemaLen + var index = 0 + var partIndex = 0 + partition.values.foreach { p => + if (p.isDefined) { + partitionBounds(partIndex) = nonPartitionSchemaLen + index + partIndex = partIndex + 1 + } else { + nonPartitionBounds(i) = nonPartitionSchemaLen + index + i = i + 1 + } + index = index + 1 + } } - val len = dfAttributes.length - val rdd = dataFrame.get.rdd.map { f => + + val len = dfAttributes.length + partitionValues.length + val transRdd = rdd.map { f => val data = new Array[Any](len) var i = 0 while (i < f.length) { - data(i) = - UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), - serializationNullFormat, - delimiterLevel1, - delimiterLevel2, - timeStampFormat, - dateFormat)) + data(nonPartitionBounds(i)) = f.get(i) i = i + 1 } - if (partitionValues.length > 0) { - var j = 0 - while (i < len) { - data(i) = UTF8String.fromString(partitionValues(j)) - j = j + 1 - i = i + 1 - } + var j = 0 + while (j < partitionBounds.length) { + data(partitionBounds(j)) = UTF8String.fromString(partitionValues(j)) + j = j + 1 } Row.fromSeq(data) } - val transRdd = if (updateModel.isDefined) { - // Get the updated query plan in case of update scenario - Dataset.ofRows( + + val (transformedPlan, partitions, persistedRDDLocal) = + transformQuery( + transRdd, sparkSession, - getLogicalQueryForUpdate( - sparkSession, - catalogTable, - dfAttributes, - rdd.map(row => InternalRow.fromSeq(row.toSeq)), - carbonLoadModel)).rdd - } else { - rdd - } - transformQuery(transRdd, true) + carbonLoadModel, + partitionValues, + catalogTable, + attributes, + sortScope, + isDataFrame = true) + partitionsLen = partitions + persistedRDD = persistedRDDLocal + transformedPlan } else { - val rowDataTypes = attributes.map { attribute => catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { case Some(attr) => attr.dataType @@ -667,12 +670,29 @@ case class CarbonLoadDataCommand( } } val columnCount = carbonLoadModel.getCsvHeaderColumns.length - var rdd = DataLoadingUtil.csvFileScanRDD( + val rdd = DataLoadingUtil.csvFileScanRDD( sparkSession, model = carbonLoadModel, hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) - transformQuery(rdd.asInstanceOf[RDD[Row]], false) + val (transformedPlan, partitions, persistedRDDLocal) = + transformQuery( + rdd.asInstanceOf[RDD[Row]], + sparkSession, + carbonLoadModel, + partitionValues, + catalogTable, + attributes, + sortScope, + isDataFrame = false) + partitionsLen = partitions + persistedRDD = persistedRDDLocal + transformedPlan + } + if (updateModel.isDefined) { + carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp) } + // Create and ddd the segment to the tablestatus. + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable) val convertRelation = convertToLogicalRelation( catalogTable, sizeInBytes, @@ -703,23 +723,37 @@ case class CarbonLoadDataCommand( overwrite = false, ifPartitionNotExists = false) Dataset.ofRows(sparkSession, convertedPlan) + } catch { + case ex: Throwable => + val (executorMessage, errorMessage) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER) + if (updateModel.isDefined) { + CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage) + } + LOGGER.info(errorMessage) + LOGGER.error(ex) + throw new Exception(errorMessage) } finally { - CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) - CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT) - CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT) - CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) - CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) CarbonSession.threadUnset("partition.operationcontext") if (isOverwriteTable) { DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) // Clean the overwriting segments if any. - new PartitionMapFileStore().cleanSegments( + SegmentFileStore.cleanSegments( table, - CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava, + null, false) } + if (partitionsLen > 1) { + // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call + // will not have any functional impact as spark automatically monitors the cache usage on + // each node and drops out old data partitions in a least-recently used (LRU) fashion. + persistedRDD match { + case Some(rdd) => rdd.unpersist(false) + case _ => + } + } } try { + carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) // Trigger auto compaction CarbonDataRDDFactory.handleSegmentMerging( sparkSession.sqlContext, @@ -732,37 +766,140 @@ case class CarbonLoadDataCommand( "Dataload is success. Auto-Compaction has failed. Please check logs.", e) } + val specs = + SegmentFileStore.getPartitionSpecs(carbonLoadModel.getSegmentId, carbonLoadModel.getTablePath) + if (specs != null) { + specs.asScala.map{ spec => + Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid) + } + } else { + Seq.empty[Row] + } } + /** + * Transform the rdd to logical plan as per the sortscope. If it is global sort scope then it + * will convert to sort logical plan otherwise project plan. + */ + private def transformQuery(rdd: RDD[Row], + sparkSession: SparkSession, + loadModel: CarbonLoadModel, + partitionValues: Array[String], + catalogTable: CatalogTable, + curAttributes: Seq[AttributeReference], + sortScope: SortScopeOptions.SortScope, + isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = { + // Converts the data as per the loading steps before give it to writer or sorter + val updatedRdd = convertData( + rdd, + sparkSession, + loadModel, + isDataFrame, + partitionValues) + val catalogAttributes = catalogTable.schema.toAttributes + var attributes = curAttributes.map(a => { + catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get + }) + attributes = attributes.map { attr => + // Update attribute datatypes in case of dictionary columns, in case of dictionary columns + // datatype is always int + val column = table.getColumnByName(table.getTableName, attr.name) + if (column.hasEncoding(Encoding.DICTIONARY)) { + AttributeReference( + attr.name, + IntegerType, + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + } else if (attr.dataType == TimestampType || attr.dataType == DateType) { + AttributeReference( + attr.name, + LongType, + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + } else { + attr + } + } + // Only select the required columns + val output = if (partition.nonEmpty) { + val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) } + catalogTable.schema.map { attr => + attributes.find(_.name.equalsIgnoreCase(attr.name)).get + }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty) + } else { + catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) + } + val partitionsLen = rdd.partitions.length + + // If it is global sort scope then appl sort logical plan on the sort columns + if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) { + // Because if the number of partitions greater than 1, there will be action operator(sample) + // in sortBy operator. So here we cache the rdd to avoid do input and convert again. + if (partitionsLen > 1) { + updatedRdd.persist(StorageLevel.fromString( + CarbonProperties.getInstance().getGlobalSortRddStorageLevel)) + } + val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession)) + val sortColumns = table.getSortColumns(table.getTableName) + val sortPlan = + Sort( + output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)), + global = true, + child) + (sortPlan, partitionsLen, Some(updatedRdd)) + } else { + (Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession)), partitionsLen, None) + } + } + + /** + * Convert the rdd as per steps of data loading inputprocessor step and coverter step + * @param originRDD + * @param sparkSession + * @param model + * @param isDataFrame + * @param partitionValues + * @return + */ private def convertData( originRDD: RDD[Row], sparkSession: SparkSession, model: CarbonLoadModel, - isDataFrame: Boolean): RDD[InternalRow] = { + isDataFrame: Boolean, + partitionValues: Array[String]): RDD[InternalRow] = { model.setPartitionId("0") val sc = sparkSession.sparkContext + val info = + model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo + info.setColumnSchemaList(new util.ArrayList[ColumnSchema](info.getColumnSchemaList)) val modelBroadcast = sc.broadcast(model) val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") // 1. Input - var convertRDD = + val convertRDD = if (isDataFrame) { originRDD.mapPartitions{rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast) } } else { - originRDD.map{row => - val array = new Array[AnyRef](row.length) + // Append the partition columns in case of static partition scenario + val partitionLen = partitionValues.length + val len = model.getCsvHeaderColumns.length - partitionLen + originRDD.map{ row => + val array = new Array[AnyRef](len + partitionLen) var i = 0 - while (i < array.length) { + while (i < len) { array(i) = row.get(i).asInstanceOf[AnyRef] i = i + 1 } + if (partitionLen > 0) { + System.arraycopy(partitionValues, 0, array, i, partitionLen) + } array } } - val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) => + val finalRDD = convertRDD.mapPartitionsWithIndex {case(index, rows) => DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) DataLoadProcessorStepOnSpark.inputAndconvertFunc( rows, @@ -783,12 +920,11 @@ case class CarbonLoadDataCommand( private def getLogicalQueryForUpdate( sparkSession: SparkSession, catalogTable: CatalogTable, - attributes: Seq[AttributeReference], - rdd: RDD[InternalRow], + df: DataFrame, carbonLoadModel: CarbonLoadModel): LogicalPlan = { sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) // In case of update, we don't need the segmrntid column in case of partitioning - val dropAttributes = attributes.dropRight(1) + val dropAttributes = df.logicalPlan.output.dropRight(1) val finalOutput = catalogTable.schema.map { attr => dropAttributes.find { d => val index = d.name.lastIndexOf("-updatedColumn") @@ -801,7 +937,7 @@ case class CarbonLoadDataCommand( } carbonLoadModel.setCsvHeader(catalogTable.schema.map(_.name.toLowerCase).mkString(",")) carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(",")) - Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) + Project(finalOutput, df.logicalPlan) } private def convertToLogicalRelation( @@ -855,9 +991,19 @@ case class CarbonLoadDataCommand( if (updateModel.isDefined) { options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString)) if (updateModel.get.deletedSegments.nonEmpty) { - options += (("segmentsToBeDeleted", updateModel.get.deletedSegments.mkString(","))) + options += (("segmentsToBeDeleted", + updateModel.get.deletedSegments.map(_.getSegmentNo).mkString(","))) } } + if (currPartitions != null) { + val currPartStr = ObjectSerializationUtil.convertObjectToString(currPartitions) + options += (("currentpartition", currPartStr)) + } + if (loadModel.getSegmentId != null) { + val currLoadEntry = + ObjectSerializationUtil.convertObjectToString(loadModel.getCurrentLoadMetadataDetail) + options += (("currentloadentry", currLoadEntry)) + } val hdfsRelation = HadoopFsRelation( location = catalog, partitionSchema = partitionSchema, http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index 72ed051..e3e4c7a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -28,8 +28,10 @@ import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, Me import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, PartitionMapFileStore} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema @@ -228,10 +230,17 @@ case class RefreshCarbonTableCommand( val allpartitions = metadataDetails.map{ metadata => if (metadata.getSegmentStatus == SegmentStatus.SUCCESS || metadata.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - val mapper = new PartitionMapFileStore() - mapper.readAllPartitionsOfSegment( - CarbonTablePath.getSegmentPath(absIdentifier.getTablePath, metadata.getLoadName)) - Some(mapper.getPartitionMap.values().asScala) + val mapper = new SegmentFileStore(absIdentifier.getTablePath, metadata.getSegmentFile) + val specs = mapper.getLocationMap.asScala.map { case(location, fd) => + var updatedLoc = + if (fd.isRelative) { + absIdentifier.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + location + } else { + location + } + new PartitionSpec(fd.getPartitions, updatedLoc) + } + Some(specs) } else { None } @@ -240,14 +249,14 @@ case class RefreshCarbonTableCommand( TableIdentifier(absIdentifier.getTableName, Some(absIdentifier.getDatabaseName)) // Register the partition information to the hive metastore allpartitions.foreach { segPartitions => - val specs: Seq[TablePartitionSpec] = segPartitions.map { indexPartitions => - indexPartitions.asScala.map{ p => + val specs: Seq[(TablePartitionSpec, Option[String])] = segPartitions.map { indexPartitions => + (indexPartitions.getPartitions.asScala.map{ p => val spec = p.split("=") (spec(0), spec(1)) - }.toMap + }.toMap, Some(indexPartitions.getLocation.toString)) }.toSeq // Add partition information - AlterTableAddPartitionCommand(identifier, specs.map((_, None)), true).run(sparkSession) + AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 756d120..4886676 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager @@ -173,7 +174,7 @@ private[sql] case class CarbonProjectForUpdateCommand( sparkSession: SparkSession, currentTime: Long, executorErrors: ExecutionErrors, - deletedSegments: Seq[String]): Unit = { + deletedSegments: Seq[Segment]): Unit = { def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = { val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 1ac0b34..25d5e91 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} @@ -61,15 +62,20 @@ object DeleteExecution { dataRdd: RDD[Row], timestamp: String, isUpdateOperation: Boolean, - executorErrors: ExecutionErrors): Seq[String] = { + executorErrors: ExecutionErrors): Seq[Segment] = { var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) - val factPath = carbonTablePath.getFactDir - var segmentsTobeDeleted = Seq.empty[String] + val isPartitionTable = carbonTable.isHivePartitionTable + val factPath = if (isPartitionTable) { + carbonTablePath.getPath + } else { + carbonTablePath.getFactDir + } + var segmentsTobeDeleted = Seq.empty[Segment] val deleteRdd = if (isUpdateOperation) { val schema = @@ -104,7 +110,7 @@ object DeleteExecution { CarbonFilters.getPartitions( Seq.empty, sparkSession, - TableIdentifier(tableName, databaseNameOp)).asJava) + TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull) val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier) CarbonUpdateUtil .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr) @@ -144,12 +150,12 @@ object DeleteExecution { // all or none : update status file, only if complete delete opeartion is successfull. def checkAndUpdateStatusFiles(): Unit = { val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() - val segmentDetails = new util.HashSet[String]() + val segmentDetails = new util.HashSet[Segment]() res.foreach(resultOfSeg => resultOfSeg.foreach( resultOfBlock => { if (resultOfBlock._1 == SegmentStatus.SUCCESS) { blockUpdateDetailsList.add(resultOfBlock._2._1) - segmentDetails.add(resultOfBlock._2._1.getSegmentName) + segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null)) // if this block is invalid then decrement block count in map. if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) { CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, @@ -250,7 +256,7 @@ object DeleteExecution { countOfRows = countOfRows + 1 } - val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath) + val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath, isPartitionTable) val completeBlockName = CarbonTablePath .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) + CarbonCommonConstants.FACT_FILE_EXT) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index bdecac1..f88e767 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager @@ -113,7 +114,7 @@ object HorizontalCompaction { absTableIdentifier: AbsoluteTableIdentifier, segmentUpdateStatusManager: SegmentUpdateStatusManager, factTimeStamp: Long, - segLists: util.List[String]): Unit = { + segLists: util.List[Segment]): Unit = { val db = carbonTable.getDatabaseName val table = carbonTable.getTableName // get the valid segments qualified for update compaction. @@ -163,7 +164,7 @@ object HorizontalCompaction { absTableIdentifier: AbsoluteTableIdentifier, segmentUpdateStatusManager: SegmentUpdateStatusManager, factTimeStamp: Long, - segLists: util.List[String]): Unit = { + segLists: util.List[Segment]): Unit = { val db = carbonTable.getDatabaseName val table = carbonTable.getTableName http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala new file mode 100644 index 0000000..2aaecc7 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.partition + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} +import org.apache.spark.sql.optimizer.CarbonFilters + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatus +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +/** + * Adding the partition to the hive and create a new segment if the location has data. + * + */ +case class CarbonAlterTableAddHivePartitionCommand( + tableName: TableIdentifier, + partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], + ifNotExists: Boolean) + extends AtomicRunnableCommand { + + var partitionSpecsAndLocsTobeAdded : util.List[PartitionSpec] = _ + var table: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + if (table.isHivePartitionTable) { + val partitionWithLoc = partitionSpecsAndLocs.filter(_._2.isDefined) + if (partitionWithLoc.nonEmpty) { + val partitionSpecs = partitionWithLoc.map{ case (part, location) => + new PartitionSpec( + new util.ArrayList(part.map(p => p._1 + "=" + p._2).toList.asJava), + location.get) + } + // Get all the partitions which are not already present in hive. + val currParts = CarbonFilters.getCurrentPartitions(sparkSession, tableName).get + partitionSpecsAndLocsTobeAdded = + new util.ArrayList(partitionSpecs.filterNot { part => + currParts.exists(p => part.equals(p)) + }.asJava) + } + AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists).run(sparkSession) + } + Seq.empty[Row] + } + + + override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { + AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true) + val msg = s"Got exception $exception when processing data of add partition." + + "Dropping partitions to the metadata" + LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg) + Seq.empty[Row] + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + // Partitions with physical data should be registered to as a new segment. + if (partitionSpecsAndLocsTobeAdded != null && partitionSpecsAndLocsTobeAdded.size() > 0) { + val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath, + partitionSpecsAndLocsTobeAdded) + if (segmentFile != null) { + val loadModel = new CarbonLoadModel + loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table)) + // Create new entry in tablestatus file + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false) + val newMetaEntry = loadModel.getCurrentLoadMetadataDetail + val segmentFileName = + loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT + newMetaEntry.setSegmentFile(segmentFileName) + val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath) + CarbonUtil.checkAndCreateFolder(segmentsLoc) + val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName + SegmentFileStore.writeSegmentFile(segmentFile, segmentPath) + CarbonLoaderUtil.populateNewLoadMetaEntry( + newMetaEntry, + SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp, + true) + // Add size to the entry + CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table) + // Make the load as success in table status + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false) + } + } + Seq.empty[Row] + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index cb4dece..407057e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -25,17 +25,17 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} -import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.PartitionMapFileStore -import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} +import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD /** * Drop the partitions from hive and carbon store. It drops the partitions in following steps @@ -59,16 +59,43 @@ case class CarbonAlterTableDropHivePartitionCommand( retainData: Boolean) extends AtomicRunnableCommand { + var carbonPartitionsTobeDropped : util.List[PartitionSpec] = _ + var table: CarbonTable = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + table = CarbonEnv.getCarbonTable(tableName)(sparkSession) if (CarbonUtil.hasAggregationDataMap(table)) { throw new AnalysisException( "Partition can not be dropped as it is mapped to Pre Aggregate table") } if (table.isHivePartitionTable) { + var locks = List.empty[ICarbonLock] try { - specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f))) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.DROP_TABLE_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.ALTER_PARTITION_LOCK) + locks = AlterTableUtil.validateTableAndAcquireLock( + table.getDatabaseName, + table.getTableName, + locksToBeAcquired)(sparkSession) + val partitions = + specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f))) + val carbonPartitions = partitions.map { partition => + new PartitionSpec(new util.ArrayList[String]( + partition.spec.seq.map { case (column, value) => column + "=" + value }.toList.asJava), + partition.location) + } + carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava) + // Drop the partitions from hive. + AlterTableDropPartitionCommand( + tableName, + specs, + ifExists, + purge, + retainData).run(sparkSession) } catch { case e: Exception => if (!ifExists) { @@ -77,15 +104,10 @@ case class CarbonAlterTableDropHivePartitionCommand( log.warn(e.getMessage) return Seq.empty[Row] } + } finally { + AlterTableUtil.releaseLocks(locks) } - // Drop the partitions from hive. - AlterTableDropPartitionCommand( - tableName, - specs, - ifExists, - purge, - retainData).run(sparkSession) } Seq.empty[Row] } @@ -100,7 +122,6 @@ case class CarbonAlterTableDropHivePartitionCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) var locks = List.empty[ICarbonLock] val uniqueId = System.currentTimeMillis().toString try { @@ -119,48 +140,27 @@ case class CarbonAlterTableDropHivePartitionCommand( }.toSet val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier) .getValidAndInvalidSegments.getValidSegments - try { - // First drop the partitions from partition mapper files of each segment - new CarbonDropPartitionRDD(sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - partitionNames.toSeq, - uniqueId, - partialMatch = true).collect() - } catch { - case e: Exception => - // roll back the drop partitions from carbon store - new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - false, - uniqueId, - partitionNames.toSeq).collect() - throw e - } - // commit the drop partitions from carbon store - new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, + // First drop the partitions from partition mapper files of each segment + val tuples = new CarbonDropPartitionRDD(sparkSession.sparkContext, table.getTablePath, segments.asScala, - true, - uniqueId, - partitionNames.toSeq).collect() - // Update the loadstatus with update time to clear cache from driver. - val segmentSet = new util.HashSet[String](new SegmentStatusManager(table - .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments) - CarbonUpdateUtil.updateTableMetadataStatus( - segmentSet, - table, - uniqueId, - true, - new util.ArrayList[String]) + carbonPartitionsTobeDropped, + uniqueId).collect() + val tobeUpdatedSegs = new util.ArrayList[String] + val tobeDeletedSegs = new util.ArrayList[String] + tuples.foreach{case (tobeUpdated, tobeDeleted) => + if (tobeUpdated.split(",").length > 0) { + tobeUpdatedSegs.add(tobeUpdated.split(",")(0)) + } + if (tobeDeleted.split(",").length > 0) { + tobeDeletedSegs.add(tobeDeleted.split(",")(0)) + } + } + SegmentFileStore.commitDropPartitions(table, uniqueId, tobeUpdatedSegs, tobeDeletedSegs) DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) } finally { AlterTableUtil.releaseLocks(locks) - new PartitionMapFileStore().cleanSegments( - table, - new util.ArrayList(CarbonFilters.getPartitions(Seq.empty, sparkSession, tableName).asJava), - false) + SegmentFileStore.cleanSegments(table, null, false) } Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index 7fe2658..38ac58e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl @@ -204,7 +204,7 @@ case class CarbonAlterTableDropPartitionCommand( val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala val threadArray: Array[Thread] = new Array[Thread](validSegments.size) var i = 0 - for (segmentId: String <- validSegments) { + for (segmentId: Segment <- validSegments) { threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor, segmentId, partitionId, dropWithData, oldPartitionIds) threadArray(i).start() @@ -216,7 +216,7 @@ case class CarbonAlterTableDropPartitionCommand( val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) - refresher.refreshSegments(validSegments.asJava) + refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava) } catch { case e: Exception => LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }") @@ -238,7 +238,7 @@ case class CarbonAlterTableDropPartitionCommand( case class dropPartitionThread(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, executor: ExecutorService, - segmentId: String, + segmentId: Segment, partitionId: String, dropWithData: Boolean, oldPartitionIds: List[Int]) extends Thread { @@ -259,7 +259,7 @@ case class dropPartitionThread(sqlContext: SQLContext, private def executeDroppingPartition(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, executor: ExecutorService, - segmentId: String, + segmentId: Segment, partitionId: String, dropWithData: Boolean, oldPartitionIds: List[Int]): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 020a72c..7aefbbe 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -215,7 +215,7 @@ case class CarbonAlterTableSplitPartitionCommand( var i = 0 validSegments.foreach { segmentId => threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor, - segmentId, partitionId, oldPartitionIdList) + segmentId.getSegmentNo, partitionId, oldPartitionIdList) threadArray(i).start() i += 1 } @@ -225,7 +225,7 @@ case class CarbonAlterTableSplitPartitionCommand( val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) - refresher.refreshSegments(validSegments.asJava) + refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava) } catch { case e: Exception => LOGGER.error(s"Exception when split partition: ${ e.getMessage }") http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index d2acb00..59c43aa 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -178,7 +179,11 @@ case class CreatePreAggregateTableCommand( // This will be used to check if the parent table has any segments or not. If not then no // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT // table. - DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable) + DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, + parentTable, + CarbonFilters.getCurrentPartitions(sparkSession, + TableIdentifier(parentTable.getTableName, + Some(parentTable.getDatabaseName))).map(_.asJava).orNull) val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath) if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index ed6be97..657e0c5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompac import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable} @@ -227,11 +228,17 @@ object LoadPostAggregateListener extends OperationEventListener { operationContext.getProperty( s"${parentTableDatabase}_${parentTableName}_Segment").toString) } else { - (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), - carbonLoadModel.getSegmentId) + val currentSegmentFile = operationContext.getProperty("current.segmentfile") + val segment = if (currentSegmentFile != null) { + new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString) + } else { + Segment.toSegment(carbonLoadModel.getSegmentId) + } + (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString) } + PreAggregateUtil.startDataLoadForDataMap( - parentTableIdentifier, + parentTableIdentifier, segmentToLoad, validateSegments = false, childLoadCommand,