[ 
https://issues.apache.org/jira/browse/SPARK-29302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16941467#comment-16941467
 ] 

feiwang commented on SPARK-29302:
---------------------------------

You can add the code below into FileFormatWriterSuite.
{code:java}
  test("SPARK-29302: for dynamic partition overwrite, a task will concurrent 
write a same file" +
    " with its relative speculation task") {
    withTempDir { f =>
      val jobId = SparkHadoopWriterUtils.createJobID(new Date(), 1)
      val taskId = new TaskID(jobId, TaskType.MAP, 1)
      val taskAttemptId0 = new TaskAttemptID(taskId, 0)
      val taskAttemptId1 = new TaskAttemptID(taskId, 1)

      val taskAttemptContext0: TaskAttemptContext = {
        // Set up the configuration object
        val hadoopConf = new Configuration();
        hadoopConf.set("mapreduce.job.id", jobId.toString)
        hadoopConf.set("mapreduce.task.id", taskAttemptId0.getTaskID.toString)
        hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId0.toString)
        hadoopConf.setBoolean("mapreduce.task.ismap", true)
        hadoopConf.setInt("mapreduce.task.partition", 0)

        new TaskAttemptContextImpl(hadoopConf, taskAttemptId0)
      }

      val taskAttemptContext1: TaskAttemptContext = {
        // Set up the configuration object
        val hadoopConf = new Configuration();
        hadoopConf.set("mapreduce.job.id", jobId.toString)
        hadoopConf.set("mapreduce.task.id", taskAttemptId1.getTaskID.toString)
        hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId1.toString)
        hadoopConf.setBoolean("mapreduce.task.ismap", true)
        hadoopConf.setInt("mapreduce.task.partition", 0)

        new TaskAttemptContextImpl(hadoopConf, taskAttemptId1)
      }

      val committer = new HadoopMapReduceCommitProtocol(jobId.toString, 
f.getAbsolutePath)
      val tf0 = committer.newTaskTempFile(taskAttemptContext0, 
Some(f.getAbsolutePath), "ext")
      val tf1 = committer.newTaskTempFile(taskAttemptContext1, 
Some(f.getAbsolutePath), "ext")
      assert(tf0 == tf1)
    }
{code}


> dynamic partition overwrite with speculation enabled
> ----------------------------------------------------
>
>                 Key: SPARK-29302
>                 URL: https://issues.apache.org/jira/browse/SPARK-29302
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4
>            Reporter: feiwang
>            Priority: Major
>
> Now, for a dynamic partition overwrite operation,  the filename of a task 
> output is determinable.
> So, if speculation is enabled,  would a task conflict with  its relative 
> speculation task?
> Would the two tasks concurrent write a same file?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to