peter-toth opened a new pull request, #55622: URL: https://github.com/apache/spark/pull/55622
### What changes were proposed in this pull request? Fix `PathOutputCommitProtocol` to correctly handle `dynamicPartitionOverwrite=true` when the underlying committer is a `FileOutputCommitter`. Two bugs were introduced by #37468 ([SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)): 1. `FileOutputCommitter` was not redirected to the staging directory (`stagingDir`) in `setupCommitter`. As a result, the committer wrote task output directly to the final destination path rather than staging it, so the `commitJob` rename-and-overwrite logic never ran and `INSERT OVERWRITE` would append instead of replacing existing partition directories. 2. `newTaskTempFile` did not populate `partitionPaths`. With no recorded partitions, `commitJob` skipped the delete-before-rename step entirely, again leaving old partition data in place. The fix mirrors the approach already used in `SQLHadoopMapReduceCommitProtocol`: - In `setupCommitter`, reinitialize `FileOutputCommitter` with `stagingDir` when `dynamicPartitionOverwrite=true` (matching `SQLHadoopMapReduceCommitProtocol`). - In `newTaskTempFile`, track `partitionPaths += dir.get` when the committer is a `FileOutputCommitter` (matching the parent class `HadoopMapReduceCommitProtocol`; the guard is intentionally absent for cloud committers that handle dynamic partition overwrite natively via `StreamCapabilities`). - Widen `partitionPaths` in `HadoopMapReduceCommitProtocol` from `private` to `protected` so the subclass can write to it. ### Why are the changes needed? `INSERT OVERWRITE` on a partitioned table with `partitionOverwriteMode=dynamic` silently appends data instead of replacing the written partitions when `PathOutputCommitProtocol` is in use. The bug was introduced by #37468 ([SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)), which enabled `FileOutputCommitter`-backed dynamic partition overwrite in `PathOutputCommitProtocol` without wiring up the staging mechanism that `HadoopMapReduceCommitProtocol` and `SQLHadoopMapReduceCommitProtocol` rely on to delete old partition directories and atomically move staged output into place. The problem became more visible after #32518 ([SPARK-35383](https://issues.apache.org/jira/browse/SPARK-35383)) changed `SparkContext` to activate `PathOutputCommitProtocol` whenever hadoop-cloud is on the classpath (via `fillMissingMagicCommitterConfsIfNeeded()`), not only when a magic-committer bucket is explicitly configured. ### Does this PR introduce _any_ user-facing change? Yes. With `spark.sql.sources.partitionOverwriteMode=dynamic` and `PathOutputCommitProtocol` active (the default when hadoop-cloud is available), `INSERT OVERWRITE` now correctly replaces the written partition directories instead of appending to them. ### How was this patch tested? Three unit tests added to `CommitterBindingSuite`: - `SPARK-56588: FileOutputCommitter dynamic partition overwrite stages output and tracks partitions` — verifies the temp file path goes through `.spark-staging-` and that `partitionPaths` is populated for `FileOutputCommitter`. - `SPARK-56588: Cloud committer with dynamic partition support does not track partitions in partitionPaths` — verifies that cloud committers implementing `CAPABILITY_DYNAMIC_PARTITIONING` via `StreamCapabilities` do not have their partitions tracked in Spark's `partitionPaths` (they manage overwrite themselves). - `SPARK-56588: FileOutputCommitter without dynamicPartitionOverwrite does not track partitions` — baseline regression guard. Manually verified with a Spark shell session (hadoop-cloud on the classpath): ``` ➜ bin/spark-shell scala> // Write initial data to two partitions scala> spark.range(6).selectExpr("id % 2 as p", "id as v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro") scala> // p=0: rows 0,2,4 p=1: rows 1,3,5 scala> spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") scala> // Overwrite only p=0 scala> spark.createDataFrame(Seq((0L, 99L))).toDF("p", "v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro") scala> spark.read.parquet("/tmp/repro").orderBy("p", "v").show() // Before fix: p=0 still shows rows 0,2,4,99 (appended, not replaced) +---+---+ | v| p| +---+---+ | 0| 0| | 2| 0| | 4| 0| | 99| 0| | 1| 1| | 3| 1| | 5| 1| +---+---+ // After fix: +---+---+ | v| p| +---+---+ | 99| 0| <- p=0 correctly replaced | 1| 1| <- p=1 untouched | 3| 1| | 5| 1| +---+---+ ``` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
