comphead opened a new issue, #3015:
URL: https://github.com/apache/datafusion-comet/issues/3015

   ### What is the problem the feature request solves?
   
   ```
    Spark Uses Committer and Staging Paths for Writes
   Apache Spark uses the FileCommitProtocol with staging paths to ensure 
atomic, fault-tolerant, and consistent distributed writes. Here's why:
   
   1. The Problem: Distributed Writing is Complex
   When multiple tasks write data in parallel across a cluster:
   
   Task failures can leave partial/corrupted data
   Speculative execution may create duplicate files
   Network failures can interrupt writes mid-stream
   Concurrent writes to the same location can conflict
   2. The Solution: Two-Phase Commit Protocol
   Spark implements a two-phase commit pattern borrowed from database systems:
   
   Phase 1: Write to Staging (Task Commit)
   Each task writes to a temporary staging location instead of the final 
destination.
   
   Phase 2: Atomic Move (Job Commit)
   Only after ALL tasks succeed, the driver atomically moves files from staging 
to final location.
   ```
   
   Currently Comet just doing naive writes without 2PC
   
   
   ### Spark implementation
   
   #### The Staging Directory From `FileCommitProtocol.getStagingDir()`
   
   ```
   def getStagingDir(path: String, jobId: String): Path = {
     new Path(path, ".spark-staging-" + jobId)
   }
   ```
   
   Example: If writing to /data/output, staging is 
`/data/output/.spark-staging-{jobId}`
   
   #### How It Works: HadoopMapReduceCommitProtocol
   here's the complete flow:
   
   ##### A. Task Execution 
   Each task writes to staging:
   
   ```
   override def newTaskTempFile(taskContext: TaskAttemptContext, dir: 
Option[String], spec: FileNameSpec): String = {
     val filename = getFilename(taskContext, spec)
     val stagingDir: Path = committer match {
       case f: FileOutputCommitter =>
         new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
       case _ => new Path(path)
     }
     
     dir.map { d =>
       new Path(new Path(stagingDir, d), filename).toString
     }.getOrElse {
       new Path(stagingDir, filename).toString
     }
   }
   ```
   File path example:
   
   
`/data/output/.spark-staging-{jobId}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/part-00000.parquet`
   
   ##### Task Commit
   When a task completes successfully:
   
   ```
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
     SparkHadoopMapRedUtil.commitTask(committer, taskContext, ...)
     new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
   }
   ```
   
   Files move from task-specific temp to job-level staging:
   
   `/data/output/.spark-staging-{jobId}/part-00000.parquet`
   
   ##### Job Commit
   Only after ALL tasks succeed, the driver commits:
   
   ```
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
     committer.commitJob(jobContext)
     
     if (hasValidPath) {
       val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
       
       // Move files from staging to final location
       for ((src, dst) <- filesToMove) {
         if (!fs.rename(new Path(src), new Path(dst))) {
           throw new IOException(s"Failed to rename $src to $dst")
         }
       }
       
       // Clean up staging directory
       fs.delete(stagingDir, true)
     }
   }
   ```
   
   Files move to final location:
   
   `/data/output/part-00000.parquet`
   
   ##### Job Abort 
   If ANY task fails, the driver aborts:
   
   ```
   override def abortJob(jobContext: JobContext): Unit = {
     try {
       committer.abortJob(jobContext, JobStatus.State.FAILED)
     } catch { case e: IOException => logWarning(...) }
     
     try {
       if (hasValidPath) {
         val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
         fs.delete(stagingDir, true)  // Delete ALL staging data
       }
     } catch { case e: IOException => logWarning(...) }
   }
   ```
   
   ### Describe the potential solution
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to