Ryan Blue created SPARK-26873: --------------------------------- Summary: FileFormatWriter creates inconsistent MR job IDs Key: SPARK-26873 URL: https://issues.apache.org/jira/browse/SPARK-26873 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0, 2.3.2, 2.2.3 Reporter: Ryan Blue
FileFormatWriter uses the current time to create a Job ID that is used when calling Hadoop committers. This ID is used to produce task and task attempt IDs used in commits. The problem is that Spark [generates this Job ID|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L209] in {{executeTask}} for every task: {code:lang=scala} /** Writes data out in a single Spark task. */ private def executeTask( description: WriteJobDescription, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow]): WriteTaskResult = { val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) ... {code} Because this is called in each task, the Job ID used is not consistent across tasks, which violates the contract expected by Hadoop committers. If a committer expects identical task IDs across attempts for correctness, this breaks correctness. For example, a Hadoop committer should be able to rename an output file to a path based on the task ID to ensure that only one copy is committed. We hit this issue when preemption caused a task to die just after the commit operation. The commit coordinator authorized a second task commit because the first did not complete due to preemption. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org