Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2109#discussion_r178784636 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false + } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + protected def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } +} + +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent] + val carbonTable = postStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + val renamedDataMaps = childCommands.takeWhile { + childCommand => + val childCarbonTable = childCommand.table + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + command => + val carbonTable = command.table + // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus + val backupTableSchemaPath = + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid + val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") + } + } + // after success/failure of commit delete all tablestatus files with UUID in their names. + // if commit failed then remove the segment directory + cleanUpStaleTableStatusFiles(childCommands.map(_.table), + operationContext, + uuid) + if (commitFailed) { + sys.error("Failed to update table status for pre-aggregate table") + } + + } + } +} + +object AlterTableDropPartitionMetaListener extends OperationEventListener{ + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent] + val parentCarbonTable = dropPartitionEvent.parentCarbonTable + val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys) + val sparkSession = SparkSession.getActiveSession.get + if (parentCarbonTable.hasAggregationDataMap) { + // used as a flag to block direct drop partition on aggregate tables fired by the user + operationContext.setProperty("isInternalDropCall", "true") + // Filter out all the tables which dont have the partition being dropped. + val childTablesWithoutPartitionColumns = + parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema => + val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala + val partitionColExists = partitionsToBeDropped.forall { + partition => + childColumns.exists { childColumn => + childColumn.getAggFunction.isEmpty && + childColumn.getParentColumnTableRelations.asScala.head.getColumnName. + equals(partition) + } + } + !partitionColExists + } + if (childTablesWithoutPartitionColumns.nonEmpty) { + throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" + + s" participating in the following datamaps ${ + childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName) --- End diff -- Make sure this list is printed to console output
---