steveloughran commented on code in PR #55622:
URL: https://github.com/apache/spark/pull/55622#discussion_r3167452171


##########
hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -115,6 +115,17 @@ class PathOutputCommitProtocol(
         // failures. Warn
         logTrace(s"Committer $committer may not be tolerant of task commit 
failures")
       }
+
+      if (dynamicPartitionOverwrite) {
+        // FileOutputCommitter must be initialized with the staging directory 
so that task output
+        // lands under stagingDir/_temporary/... and commitJob can later 
delete the old partition
+        // directories and move staged files to final dest. Without this, the 
committer writes
+        // directly to the final path and the dynamic-overwrite cleanup in 
commitJob never sees any
+        // partitionPaths.
+        val ctor =
+          committer.getClass.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
+        committer = ctor.newInstance(stagingDir, context)

Review Comment:
   this should be unique per job.
   
   we've had escalations where two scheduled jobs overlapped (one committing 
while the other starting up), and while they never conflicted in write ops, use 
of shared temp dirs broke things.
   
   do review, include overlap condition in test. thx



##########
hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala:
##########
@@ -264,5 +277,101 @@ class CommitterBindingSuite extends SparkFunSuite {
       "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
   }
 
-}
+  /*
+   * With dynamicPartitionOverwrite=true and a FileOutputCommitter, 
newTaskTempFile must route
+   * output through the staging directory (not the final output path) and must 
record the partition
+   * in partitionPaths so that commitJob can delete the old partition 
directory and rename the
+   * staged one into place.
+   */
+  test("SPARK-56588: FileOutputCommitter dynamic partition overwrite stages 
output and tracks " +
+      "partitions") {
+    val jobCommitDir = File.createTempFile("dyn-part-overwrite-staging", "")
+    try {
+      jobCommitDir.delete()
+      val jobUri = jobCommitDir.toURI
+      val path = new Path(jobUri)
+      val job = newJob(path)
+      val conf = job.getConfiguration
+      conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+      conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+      bindToFileOutputCommitterFactory(conf, "file")
+      val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+      val committer = new PathOutputCommitProtocolForTest(jobId, 
jobUri.toString, true)
+      committer.setupJob(tContext)
+      committer.setupTask(tContext)
+
+      val spec = FileNameSpec("", ".parquet")
+      val partition = "a=1/b=2"
+      val tempPath = committer.newTaskTempFile(tContext, Some(partition), spec)
+
+      // The temp file must be under the staging directory, not the final 
output path.
+      assert(tempPath.contains(".spark-staging-"),
+        s"Expected temp path under staging dir, got: $tempPath")
+      assert(!tempPath.startsWith(path.toUri.toString.stripSuffix("/") + "/" + 
partition),
+        s"Temp path must not point directly to the final output location: 
$tempPath")
+
+      // The partition must have been recorded so commitJob can overwrite it.
+      assert(committer.capturedPartitionPaths === Set(partition),
+        s"Expected partitionPaths = {$partition}, got: 
${committer.capturedPartitionPaths}")
+    } finally {
+      jobCommitDir.delete()
+    }
+  }
 
+  /*
+   * A cloud committer that handles dynamic partitioning natively (via 
StreamCapabilities) must NOT
+   * have its partitions tracked in Spark's partitionPaths set: the committer 
takes care of
+   * overwriting itself, and the commitJob rename loop must not interfere.
+   */
+  test("SPARK-56588: Cloud committer with dynamic partition support does not 
track partitions in " +
+      "partitionPaths") {
+    val path = new Path("http://example/data";)
+    val job = newJob(path)
+    val conf = job.getConfiguration
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+    StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
+    val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+    val committer = new PathOutputCommitProtocolForTest(jobId, 
path.toUri.toString, true)
+    committer.setupJob(tContext)
+    committer.setupTask(tContext)
+
+    committer.newTaskTempFile(tContext, Some("a=1"), FileNameSpec("", 
".parquet"))

Review Comment:
   you can add an assert on the path of this too; should be under the staging 
dir



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to