[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20931 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user fangshil commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179912635 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179681849 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- ah makes sense, let's add some comment to summary these discussions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user fangshil commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179672722 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- The FileSystem API spec on delete says "Code SHOULD just call delete(path, recursive) and assume the destination is no longer present". Referring to its detailed spec, the only case that we may get false from delete would be finalPartPath does not exist. Other failures should result in exception. When finalPartPath does not exist, which is an expected case, we should only check if the parent of finalPartPath does not exist because otherwise we will have problem in rename according to its spec. Please advise if you guys think we still should double-check finalPartPath before rename --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179651241 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- +1 on add comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179651085 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- BTW we should add comments around here to explain all these stuff. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179651049 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- I feel the code here is not safe. We may fail to delete if `finalPartPath` doesn't exist, or there are some real failures. We should make sure `finalPartPath` doesn't exist before renaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179651037 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- I think the problem here is we didn't check whether the `finalPartPath` exists, and we shall actually check that before rename. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179650482 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- sorry I don't get it, if the delete success, your patch doesn't change anything, do you mean HDFS will always fail to delete `finalPartPath`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179647871 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- isn't it more clear to just do `assert(fs.delete(finalPartPath, true))`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user fangshil commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179517200 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- @cloud-fan this is to follow the behavior of HDFS rename spec: it requires the parent to be present. If we create finalPartPath directly, then it will cause another wired behavior in rename when the dst path already exists. From the HDFS spec I shared above: " If the destination exists and is a directory, the final destination of the rename becomes the destination + the filename of the source path". We have confirmed this in our production cluster, and resulted in the current solution to only create parent dir which follows the HDFS spec exactly, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179495367 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { --- End diff -- why we only create the parent dir if we fail to delete the `finalPartPath`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user fangshil commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179346911 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { +fs.mkdirs(finalPartPath.getParent) --- End diff -- @cloud-fan yes, in official HDFS document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename command has precondition "dest must be root, or have a parent that exists" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20931#discussion_r179036894 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol( logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) - fs.delete(finalPartPath, true) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { +fs.mkdirs(finalPartPath.getParent) --- End diff -- do you have some official HDFS document to support this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
GitHub user fangshil opened a pull request: https://github.com/apache/spark/pull/20931 [SPARK-23815][Core]Spark writer dynamic partition overwrite mode may fail to write output on multi level partition ## What changes were proposed in this pull request? Spark introduced new writer mode to overwrite only related partitions in SPARK-20236. While we are using this feature in our production cluster, we found a bug when writing multi-level partitions on HDFS. A simple test case to reproduce this issue: val df = Seq(("1","2","3")).toDF("col1", "col2","col3") df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location") If HDFS location "/my/hdfs/location" does not exist, there will be no output. This seems to be caused by the job commit change in SPARK-20236 in HadoopMapReduceCommitProtocol. In the commit job process, the output has been written into staging dir /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to /my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not create directory for more than one level. This does not happen in unit test covered with SPARK-20236 with local file system. We are proposing a fix. When cleaning current partition dir /my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails (because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not exist) so the following rename op can succeed. ## How was this patch tested? We have tested this patch on our production cluster and it fixed the problem You can merge this pull request into a Git repository by running: $ git pull https://github.com/fangshil/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20931 commit da63c17d7ae7fbf04cc474d946d61a098b3e1ade Author: Fangshi Li Date: 2018-03-28T04:25:54Z Spark writer dynamic partition overwrite mode may fail to write output on multi level partition --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org