Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1755#discussion_r159591992 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand( sparkSession: SparkSession, table: CarbonTable, logicalPlan: LogicalPlan): Unit = { - sparkSession.sessionState.catalog.listPartitions( + val existingPartitions = sparkSession.sessionState.catalog.listPartitions( TableIdentifier(table.getTableName, Some(table.getDatabaseName)), - Some(partition.map(f => (f._1, f._2.get)))) - val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet + Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)))) + val partitionNames = existingPartitions.toList.flatMap { partition => + partition.spec.seq.map{case (column, value) => column + "=" + value} + }.toSet val uniqueId = System.currentTimeMillis().toString val segments = new SegmentStatusManager( table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments - try { - // First drop the partitions from partition mapper files of each segment - new CarbonDropPartitionRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - partitionNames.toSeq, - uniqueId).collect() - } catch { - case e: Exception => - // roll back the drop partitions from carbon store - new CarbonDropPartitionCommitRDD( + // If any existing partitions need to be overwritten then drop from partitionmap + if (partitionNames.nonEmpty) { + try { + // First drop the partitions from partition mapper files of each segment + new CarbonDropPartitionRDD( sparkSession.sparkContext, table.getTablePath, segments.asScala, - false, + partitionNames.toSeq, uniqueId).collect() - throw e - } + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + false, + uniqueId).collect() + throw e + } - try { + try { + Dataset.ofRows(sparkSession, logicalPlan) + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + false, + uniqueId).collect() + throw e + } + // Commit the removed partitions in carbon store. + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + true, --- End diff -- provide variable name to improve readability
---