steveloughran commented on code in PR #55622:
URL: https://github.com/apache/spark/pull/55622#discussion_r3167452171
##########
hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -115,6 +115,17 @@ class PathOutputCommitProtocol(
// failures. Warn
logTrace(s"Committer $committer may not be tolerant of task commit
failures")
}
+
+ if (dynamicPartitionOverwrite) {
+ // FileOutputCommitter must be initialized with the staging directory
so that task output
+ // lands under stagingDir/_temporary/... and commitJob can later
delete the old partition
+ // directories and move staged files to final dest. Without this, the
committer writes
+ // directly to the final path and the dynamic-overwrite cleanup in
commitJob never sees any
+ // partitionPaths.
+ val ctor =
+ committer.getClass.getDeclaredConstructor(classOf[Path],
classOf[TaskAttemptContext])
+ committer = ctor.newInstance(stagingDir, context)
Review Comment:
this should be unique per job.
we've had escalations where two scheduled jobs overlapped (one committing
while the other starting up), and while they never conflicted in write ops, use
of shared temp dirs broke things.
do review, include overlap condition in test. thx
##########
hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala:
##########
@@ -264,5 +277,101 @@ class CommitterBindingSuite extends SparkFunSuite {
"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
}
-}
+ /*
+ * With dynamicPartitionOverwrite=true and a FileOutputCommitter,
newTaskTempFile must route
+ * output through the staging directory (not the final output path) and must
record the partition
+ * in partitionPaths so that commitJob can delete the old partition
directory and rename the
+ * staged one into place.
+ */
+ test("SPARK-56588: FileOutputCommitter dynamic partition overwrite stages
output and tracks " +
+ "partitions") {
+ val jobCommitDir = File.createTempFile("dyn-part-overwrite-staging", "")
+ try {
+ jobCommitDir.delete()
+ val jobUri = jobCommitDir.toURI
+ val path = new Path(jobUri)
+ val job = newJob(path)
+ val conf = job.getConfiguration
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+ bindToFileOutputCommitterFactory(conf, "file")
+ val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+ val committer = new PathOutputCommitProtocolForTest(jobId,
jobUri.toString, true)
+ committer.setupJob(tContext)
+ committer.setupTask(tContext)
+
+ val spec = FileNameSpec("", ".parquet")
+ val partition = "a=1/b=2"
+ val tempPath = committer.newTaskTempFile(tContext, Some(partition), spec)
+
+ // The temp file must be under the staging directory, not the final
output path.
+ assert(tempPath.contains(".spark-staging-"),
+ s"Expected temp path under staging dir, got: $tempPath")
+ assert(!tempPath.startsWith(path.toUri.toString.stripSuffix("/") + "/" +
partition),
+ s"Temp path must not point directly to the final output location:
$tempPath")
+
+ // The partition must have been recorded so commitJob can overwrite it.
+ assert(committer.capturedPartitionPaths === Set(partition),
+ s"Expected partitionPaths = {$partition}, got:
${committer.capturedPartitionPaths}")
+ } finally {
+ jobCommitDir.delete()
+ }
+ }
+ /*
+ * A cloud committer that handles dynamic partitioning natively (via
StreamCapabilities) must NOT
+ * have its partitions tracked in Spark's partitionPaths set: the committer
takes care of
+ * overwriting itself, and the commitJob rename loop must not interfere.
+ */
+ test("SPARK-56588: Cloud committer with dynamic partition support does not
track partitions in " +
+ "partitionPaths") {
+ val path = new Path("http://example/data")
+ val job = newJob(path)
+ val conf = job.getConfiguration
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+ StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
+ val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+ val committer = new PathOutputCommitProtocolForTest(jobId,
path.toUri.toString, true)
+ committer.setupJob(tContext)
+ committer.setupTask(tContext)
+
+ committer.newTaskTempFile(tContext, Some("a=1"), FileNameSpec("",
".parquet"))
Review Comment:
you can add an assert on the path of this too; should be under the staging
dir
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]