Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178784636
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 ---
    @@ -39,6 +41,212 @@ import 
org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, 
LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends 
OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = 
event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = 
operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && 
carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        
childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      
childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = 
SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + 
"_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if 
detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, 
newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + 
backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + 
backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends 
OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = 
event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = 
operationContext.getProperty("dropPartitionCommands")
    +    val uuid = 
Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && 
carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        
childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = 
CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: 
tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, 
newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = 
renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID 
to tablestatus
    +            val backupTableSchemaPath =
    +              
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + 
uuid
    +            val tableSchemaPath = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, 
tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with 
UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    +        operationContext,
    +        uuid)
    +      if (commitFailed) {
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +
    +      }
    +    }
    +}
    +
    +object AlterTableDropPartitionMetaListener extends OperationEventListener{
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val dropPartitionEvent = 
event.asInstanceOf[AlterTableDropPartitionMetaEvent]
    +    val parentCarbonTable = dropPartitionEvent.parentCarbonTable
    +    val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
    +    val sparkSession = SparkSession.getActiveSession.get
    +    if (parentCarbonTable.hasAggregationDataMap) {
    +      // used as a flag to block direct drop partition on aggregate tables 
fired by the user
    +      operationContext.setProperty("isInternalDropCall", "true")
    +      // Filter out all the tables which dont have the partition being 
dropped.
    +      val childTablesWithoutPartitionColumns =
    +        parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter 
{ dataMapSchema =>
    +          val childColumns = 
dataMapSchema.getChildSchema.getListOfColumns.asScala
    +          val partitionColExists = partitionsToBeDropped.forall {
    +            partition =>
    +              childColumns.exists { childColumn =>
    +                  childColumn.getAggFunction.isEmpty &&
    +                  
childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
    +                    equals(partition)
    +              }
    +          }
    +          !partitionColExists
    +      }
    +      if (childTablesWithoutPartitionColumns.nonEmpty) {
    +        throw new MetadataProcessException(s"Cannot drop partition as one 
of the partition is not" +
    +                                           s" participating in the 
following datamaps ${
    +          
childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
    --- End diff --
    
    Make sure this list is printed to console output


---

Reply via email to