peter-toth commented on code in PR #55622:
URL: https://github.com/apache/spark/pull/55622#discussion_r3167925468


##########
hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala:
##########
@@ -264,5 +277,101 @@ class CommitterBindingSuite extends SparkFunSuite {
       "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
   }
 
-}
+  /*
+   * With dynamicPartitionOverwrite=true and a FileOutputCommitter, 
newTaskTempFile must route
+   * output through the staging directory (not the final output path) and must 
record the partition
+   * in partitionPaths so that commitJob can delete the old partition 
directory and rename the
+   * staged one into place.
+   */
+  test("SPARK-56588: FileOutputCommitter dynamic partition overwrite stages 
output and tracks " +
+      "partitions") {
+    val jobCommitDir = File.createTempFile("dyn-part-overwrite-staging", "")
+    try {
+      jobCommitDir.delete()
+      val jobUri = jobCommitDir.toURI
+      val path = new Path(jobUri)
+      val job = newJob(path)
+      val conf = job.getConfiguration
+      conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+      conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+      bindToFileOutputCommitterFactory(conf, "file")
+      val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+      val committer = new PathOutputCommitProtocolForTest(jobId, 
jobUri.toString, true)
+      committer.setupJob(tContext)
+      committer.setupTask(tContext)
+
+      val spec = FileNameSpec("", ".parquet")
+      val partition = "a=1/b=2"
+      val tempPath = committer.newTaskTempFile(tContext, Some(partition), spec)
+
+      // The temp file must be under the staging directory, not the final 
output path.
+      assert(tempPath.contains(".spark-staging-"),
+        s"Expected temp path under staging dir, got: $tempPath")
+      assert(!tempPath.startsWith(path.toUri.toString.stripSuffix("/") + "/" + 
partition),
+        s"Temp path must not point directly to the final output location: 
$tempPath")
+
+      // The partition must have been recorded so commitJob can overwrite it.
+      assert(committer.capturedPartitionPaths === Set(partition),
+        s"Expected partitionPaths = {$partition}, got: 
${committer.capturedPartitionPaths}")
+    } finally {
+      jobCommitDir.delete()
+    }
+  }
 
+  /*
+   * A cloud committer that handles dynamic partitioning natively (via 
StreamCapabilities) must NOT
+   * have its partitions tracked in Spark's partitionPaths set: the committer 
takes care of
+   * overwriting itself, and the commitJob rename loop must not interfere.
+   */
+  test("SPARK-56588: Cloud committer with dynamic partition support does not 
track partitions in " +
+      "partitionPaths") {
+    val path = new Path("http://example/data";)
+    val job = newJob(path)
+    val conf = job.getConfiguration
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+    StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
+    val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+    val committer = new PathOutputCommitProtocolForTest(jobId, 
path.toUri.toString, true)
+    committer.setupJob(tContext)
+    committer.setupTask(tContext)
+
+    committer.newTaskTempFile(tContext, Some("a=1"), FileNameSpec("", 
".parquet"))

Review Comment:
   Added assert in 
https://github.com/apache/spark/pull/55622/commits/031b850fc673bd408a84a8027359b078ff15cc63.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to