This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d46b15d2b23 [SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory d46b15d2b23 is described below commit d46b15d2b23f13b65d781bb364ccde3be6679b99 Author: Yikf <yikai...@apache.org> AuthorDate: Mon Feb 27 16:56:04 2023 +0800 [SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory ### What changes were proposed in this pull request? Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory ### Why are the changes needed? [SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter, but it breaks a serializable issue, JobId is non-serializable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA Closes #40064 from Yikf/write-job-id. Authored-by: Yikf <yikai...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/datasources/v2/FileWriterFactory.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala index ea13e2deac8..4b1a099d3ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala @@ -30,7 +30,12 @@ case class FileWriterFactory ( description: WriteJobDescription, committer: FileCommitProtocol) extends DataWriterFactory { - private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) + // SPARK-42478: jobId across tasks should be consistent to meet the contract + // expected by Hadoop committers, but `JobId` cannot be serialized. + // thus, persist the serializable jobTrackerID in the class and make jobId a + // transient lazy val which recreates it each time to ensure jobId is unique. + private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date) + @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0) override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { val taskAttemptContext = createTaskAttemptContext(partitionId) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org