advancedxy commented on a change in pull request #25863: 
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate 
result and support concurrent file source write operations write to different 
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329335511
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##########
 @@ -91,7 +96,62 @@ class HadoopMapReduceCommitProtocol(
    */
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Whether is a InsertIntoHadoopFsRelation operation, the default is false.
+   */
+  private def isInsertIntoHadoopFsRelation =
+    fileSourceWriteDesc.map(_.isInsertIntoHadoopFsRelation).getOrElse(false)
+
+  /**
+   * Get escaped static partition key and value pairs, the default is empty.
+   */
+  private def escapedStaticPartitionKVs =
+    fileSourceWriteDesc.map(_.escapedStaticPartitionKVs).getOrElse(Seq.empty)
+
+  /**
+   * The staging root directory for InsertIntoHadoopFsRelation operation.
+   */
+  @transient private var insertStagingDir: Path = null
+
+  /**
+   * The staging output path for InsertIntoHadoopFsRelation operation.
+   */
+  @transient private var stagingOutputPath: Path = null
+
+  /**
+   * Get the desired output path for the job. The output will be [[path]] when 
current operation
+   * is not a InsertIntoHadoopFsRelation operation. Otherwise, we choose a sub 
path composed of
+   * [[escapedStaticPartitionKVs]] under [[insertStagingDir]] over [[path]] to 
mark this operation
+   * and we can detect whether there is a operation conflict with current by 
checking the existence
+   * of relative output path.
+   *
+   * @return Path the desired output path.
+   */
+  protected def getOutputPath(context: TaskAttemptContext): Path = {
+    if (isInsertIntoHadoopFsRelation) {
+      val insertStagingPath = ".spark-staging-" + 
escapedStaticPartitionKVs.size
+      insertStagingDir = new Path(path, insertStagingPath)
+      val appId = SparkEnv.get.conf.getAppId
+      val outputPath = new Path(path, Array(insertStagingPath,
+        getEscapedStaticPartitionPath(escapedStaticPartitionKVs), appId, jobId)
+        .mkString(File.separator))
+      
insertStagingDir.getFileSystem(context.getConfiguration).makeQualified(outputPath)
+      outputPath
+    } else {
+      new Path(path)
+    }
+  }
+
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
+    if (isInsertIntoHadoopFsRelation) {
+      stagingOutputPath = getOutputPath(context)
+      context.getConfiguration.set(FileOutputFormat.OUTDIR, 
stagingOutputPath.toString)
+      logWarning("Set file output committer algorithm version to 2 
implicitly," +
+        " for that the task output would be committed to staging output path 
firstly," +
+        " which is equivalent to algorithm 1.")
 
 Review comment:
   15843 is a lot, however, it would be not that much inside one spark 
application.
   One way to solve this, is to use an object level counter to only log the 
first warning log(or logs).
   But I am not sure if that's worth it. Also, the head of logs may get rotated 
and discarded...
   
   Or use logDebug is fine, but normally user won't set log level to DEBUG.
   
   I am not sure which one is better.  It's up to you then.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to