This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 18d141250c8 [SPARK-41448] Make consistent MR job IDs in FileBatchWriter and FileFormatWriter 18d141250c8 is described below commit 18d141250c8ffcf4f30d2b0edb77b57e3945f3f1 Author: Hui An <hui...@shopee.com> AuthorDate: Mon Dec 12 18:17:49 2022 +0800 [SPARK-41448] Make consistent MR job IDs in FileBatchWriter and FileFormatWriter ### What changes were proposed in this pull request? Make consistent MR job IDs in FileBatchWriter and FileFormatWriter ### Why are the changes needed? [SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the consistent issue for FileFormatWriter, but [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness. Also FileBatchWriter doesn't follow this requirement, need to fix it as well. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes #38980 from boneanxs/SPARK-41448. Authored-by: Hui An <hui...@shopee.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 7801666f3b5ea3bfa0f95571c1d68147ce5240ec) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/internal/io/SparkHadoopWriterUtils.scala | 15 +++++++++++++-- .../sql/execution/datasources/FileFormatWriter.scala | 8 ++++---- .../sql/execution/datasources/v2/FileWriterFactory.scala | 4 +++- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala index 657842c620f..6ba6713b699 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala @@ -47,11 +47,22 @@ object SparkHadoopWriterUtils { * @return a job ID */ def createJobID(time: Date, id: Int): JobID = { + val jobTrackerID = createJobTrackerID(time) + createJobID(jobTrackerID, id) + } + + /** + * Create a job ID. + * + * @param jobTrackerID unique job track id + * @param id job number + * @return a job ID + */ + def createJobID(jobTrackerID: String, id: Int): JobID = { if (id < 0) { throw new IllegalArgumentException("Job number is negative") } - val jobtrackerID = createJobTrackerID(time) - new JobID(jobtrackerID, id) + new JobID(jobTrackerID, id) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a9d4d4208f3..4f7d5069be1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -223,14 +223,14 @@ object FileFormatWriter extends Logging { rdd } - val jobIdInstant = new Date().getTime + val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) sparkSession.sparkContext.runJob( rddWithNonEmptyPartitions, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, - jobIdInstant = jobIdInstant, + jobTrackerID = jobTrackerID, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, @@ -265,7 +265,7 @@ object FileFormatWriter extends Logging { /** Writes data out in a single Spark task. */ private def executeTask( description: WriteJobDescription, - jobIdInstant: Long, + jobTrackerID: String, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, @@ -273,7 +273,7 @@ object FileFormatWriter extends Logging { iterator: Iterator[InternalRow], concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { - val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId) + val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala index d827e836235..ea13e2deac8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala @@ -29,6 +29,9 @@ import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWri case class FileWriterFactory ( description: WriteJobDescription, committer: FileCommitProtocol) extends DataWriterFactory { + + private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) + override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { val taskAttemptContext = createTaskAttemptContext(partitionId) committer.setupTask(taskAttemptContext) @@ -40,7 +43,6 @@ case class FileWriterFactory ( } private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = { - val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) val taskId = new TaskID(jobId, TaskType.MAP, partitionId) val taskAttemptId = new TaskAttemptID(taskId, 0) // Set up the configuration object --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org