comphead opened a new issue, #3015:
URL: https://github.com/apache/datafusion-comet/issues/3015
### What is the problem the feature request solves?
```
Spark Uses Committer and Staging Paths for Writes
Apache Spark uses the FileCommitProtocol with staging paths to ensure
atomic, fault-tolerant, and consistent distributed writes. Here's why:
1. The Problem: Distributed Writing is Complex
When multiple tasks write data in parallel across a cluster:
Task failures can leave partial/corrupted data
Speculative execution may create duplicate files
Network failures can interrupt writes mid-stream
Concurrent writes to the same location can conflict
2. The Solution: Two-Phase Commit Protocol
Spark implements a two-phase commit pattern borrowed from database systems:
Phase 1: Write to Staging (Task Commit)
Each task writes to a temporary staging location instead of the final
destination.
Phase 2: Atomic Move (Job Commit)
Only after ALL tasks succeed, the driver atomically moves files from staging
to final location.
```
Currently Comet just doing naive writes without 2PC
### Spark implementation
#### The Staging Directory From `FileCommitProtocol.getStagingDir()`
```
def getStagingDir(path: String, jobId: String): Path = {
new Path(path, ".spark-staging-" + jobId)
}
```
Example: If writing to /data/output, staging is
`/data/output/.spark-staging-{jobId}`
#### How It Works: HadoopMapReduceCommitProtocol
here's the complete flow:
##### A. Task Execution
Each task writes to staging:
```
override def newTaskTempFile(taskContext: TaskAttemptContext, dir:
Option[String], spec: FileNameSpec): String = {
val filename = getFilename(taskContext, spec)
val stagingDir: Path = committer match {
case f: FileOutputCommitter =>
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}
dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
}
}
```
File path example:
`/data/output/.spark-staging-{jobId}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/part-00000.parquet`
##### Task Commit
When a task completes successfully:
```
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
= {
SparkHadoopMapRedUtil.commitTask(committer, taskContext, ...)
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}
```
Files move from task-specific temp to job-level staging:
`/data/output/.spark-staging-{jobId}/part-00000.parquet`
##### Job Commit
Only after ALL tasks succeed, the driver commits:
```
override def commitJob(jobContext: JobContext, taskCommits:
Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
if (hasValidPath) {
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
// Move files from staging to final location
for ((src, dst) <- filesToMove) {
if (!fs.rename(new Path(src), new Path(dst))) {
throw new IOException(s"Failed to rename $src to $dst")
}
}
// Clean up staging directory
fs.delete(stagingDir, true)
}
}
```
Files move to final location:
`/data/output/part-00000.parquet`
##### Job Abort
If ANY task fails, the driver aborts:
```
override def abortJob(jobContext: JobContext): Unit = {
try {
committer.abortJob(jobContext, JobStatus.State.FAILED)
} catch { case e: IOException => logWarning(...) }
try {
if (hasValidPath) {
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(stagingDir, true) // Delete ALL staging data
}
} catch { case e: IOException => logWarning(...) }
}
```
### Describe the potential solution
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]