This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 70f3d2fec92 [SPARK-41448] Make consistent MR job IDs in 
FileBatchWriter and FileFormatWriter
70f3d2fec92 is described below

commit 70f3d2fec92f4f30a3c9395f822717cf65a744ba
Author: Hui An <hui...@shopee.com>
AuthorDate: Mon Dec 12 18:17:49 2022 +0800

    [SPARK-41448] Make consistent MR job IDs in FileBatchWriter and 
FileFormatWriter
    
    ### What changes were proposed in this pull request?
    Make consistent MR job IDs in FileBatchWriter and FileFormatWriter
    
    ### Why are the changes needed?
    
    [SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the 
consistent issue for FileFormatWriter, but 
[SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this 
requirement by introducing a random long, we need to address this to expects 
identical task IDs across attempts for correctness.
    
    Also FileBatchWriter doesn't follow this requirement, need to fix it as 
well.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Closes #38980 from boneanxs/SPARK-41448.
    
    Authored-by: Hui An <hui...@shopee.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 7801666f3b5ea3bfa0f95571c1d68147ce5240ec)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/internal/io/SparkHadoopWriterUtils.scala | 15 +++++++++++++--
 .../sql/execution/datasources/FileFormatWriter.scala      |  8 ++++----
 .../sql/execution/datasources/v2/FileWriterFactory.scala  |  4 +++-
 3 files changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
index 657842c620f..6ba6713b699 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
@@ -47,11 +47,22 @@ object SparkHadoopWriterUtils {
    * @return a job ID
    */
   def createJobID(time: Date, id: Int): JobID = {
+    val jobTrackerID = createJobTrackerID(time)
+    createJobID(jobTrackerID, id)
+  }
+
+  /**
+   * Create a job ID.
+   *
+   * @param jobTrackerID unique job track id
+   * @param id job number
+   * @return a job ID
+   */
+  def createJobID(jobTrackerID: String, id: Int): JobID = {
     if (id < 0) {
       throw new IllegalArgumentException("Job number is negative")
     }
-    val jobtrackerID = createJobTrackerID(time)
-    new JobID(jobtrackerID, id)
+    new JobID(jobTrackerID, id)
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 02af79ea2bb..84f3fd360b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -250,14 +250,14 @@ object FileFormatWriter extends Logging {
         rdd
       }
 
-      val jobIdInstant = new Date().getTime
+      val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
       val ret = new 
Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
       sparkSession.sparkContext.runJob(
         rddWithNonEmptyPartitions,
         (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
           executeTask(
             description = description,
-            jobIdInstant = jobIdInstant,
+            jobTrackerID = jobTrackerID,
             sparkStageId = taskContext.stageId(),
             sparkPartitionId = taskContext.partitionId(),
             sparkAttemptNumber = taskContext.taskAttemptId().toInt & 
Integer.MAX_VALUE,
@@ -292,7 +292,7 @@ object FileFormatWriter extends Logging {
   /** Writes data out in a single Spark task. */
   private def executeTask(
       description: WriteJobDescription,
-      jobIdInstant: Long,
+      jobTrackerID: String,
       sparkStageId: Int,
       sparkPartitionId: Int,
       sparkAttemptNumber: Int,
@@ -300,7 +300,7 @@ object FileFormatWriter extends Logging {
       iterator: Iterator[InternalRow],
       concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): 
WriteTaskResult = {
 
-    val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), 
sparkStageId)
+    val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId)
     val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
     val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
index d827e836235..ea13e2deac8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
@@ -29,6 +29,9 @@ import 
org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWri
 case class FileWriterFactory (
     description: WriteJobDescription,
     committer: FileCommitProtocol) extends DataWriterFactory {
+
+  private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+
   override def createWriter(partitionId: Int, realTaskId: Long): 
DataWriter[InternalRow] = {
     val taskAttemptContext = createTaskAttemptContext(partitionId)
     committer.setupTask(taskAttemptContext)
@@ -40,7 +43,6 @@ case class FileWriterFactory (
   }
 
   private def createTaskAttemptContext(partitionId: Int): 
TaskAttemptContextImpl = {
-    val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
     val taskId = new TaskID(jobId, TaskType.MAP, partitionId)
     val taskAttemptId = new TaskAttemptID(taskId, 0)
     // Set up the configuration object


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to