Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159591992
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = 
sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           
table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each 
segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from 
partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each 
segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
    +      // Commit the removed partitions in carbon store.
    +      new CarbonDropPartitionCommitRDD(
    +        sparkSession.sparkContext,
    +        table.getTablePath,
    +        segments.asScala,
    +        true,
    --- End diff --
    
    provide variable name to improve readability


---

Reply via email to