[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20931


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-07 Thread fangshil
Github user fangshil commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179912635
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

added


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179681849
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

ah makes sense, let's add some comment to summary these discussions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread fangshil
Github user fangshil commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179672722
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

The FileSystem API spec on delete says "Code SHOULD just call delete(path, 
recursive) and assume the destination is no longer present". Referring to its 
detailed spec, the only case that we may get false from delete would be 
finalPartPath does not exist. Other failures should result in 
exception.  When finalPartPath does not exist, which is an 
expected case, we should only check if the parent of finalPartPath 
does not exist because otherwise we will have problem in rename according to 
its spec.  Please advise if you guys think we still should double-check 
finalPartPath before rename


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179651241
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

+1 on add comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179651085
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

BTW we should add comments around here to explain all these stuff.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179651049
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

I feel the code here is not safe. We may fail to delete if `finalPartPath` 
doesn't exist, or there are some real failures. We should make sure 
`finalPartPath` doesn't exist before renaming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179651037
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

I think the problem here is we didn't check whether the `finalPartPath` 
exists, and we shall actually check that before rename. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179650482
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

sorry I don't get it, if the delete success, your patch doesn't change 
anything, do you mean HDFS will always fail to delete `finalPartPath`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179647871
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

isn't it more clear to just do `assert(fs.delete(finalPartPath, true))`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread fangshil
Github user fangshil commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179517200
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

@cloud-fan this is to follow the behavior of HDFS rename spec: it requires 
the parent to be present. If we create finalPartPath directly, then it will 
cause another wired behavior in rename when the dst path already exists. From 
the HDFS spec I shared above: " If the destination exists and is a directory, 
the final destination of the rename becomes the destination + the filename of 
the source path".  We have confirmed this in our production cluster, and 
resulted in the current solution to only create parent dir which follows the 
HDFS spec exactly,


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179495367
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
--- End diff --

why we only create the parent dir if we fail to delete the `finalPartPath`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-04 Thread fangshil
Github user fangshil commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179346911
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
+fs.mkdirs(finalPartPath.getParent)
--- End diff --

@cloud-fan  yes, in official HDFS 
document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html),
 the rename command has precondition "dest must be root, or have a parent that 
exists"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-04-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20931#discussion_r179036894
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -186,7 +186,9 @@ class HadoopMapReduceCommitProtocol(
 logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
 for (part <- partitionPaths) {
   val finalPartPath = new Path(path, part)
-  fs.delete(finalPartPath, true)
+  if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
+fs.mkdirs(finalPartPath.getParent)
--- End diff --

do you have some official HDFS document to support this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...

2018-03-28 Thread fangshil
GitHub user fangshil opened a pull request:

https://github.com/apache/spark/pull/20931

[SPARK-23815][Core]Spark writer dynamic partition overwrite mode may fail 
to write output on multi level partition


## What changes were proposed in this pull request?

Spark introduced new writer mode to overwrite only related partitions in 
SPARK-20236. While we are using this feature in our production cluster, we 
found a bug when writing multi-level partitions on HDFS.

A simple test case to reproduce this issue:
val df = Seq(("1","2","3")).toDF("col1", "col2","col3")

df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location")

If HDFS location "/my/hdfs/location" does not exist, there will be no 
output.

This seems to be caused by the job commit change in SPARK-20236 in 
HadoopMapReduceCommitProtocol.

In the commit job process, the output has been written into staging dir 
/my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls 
fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to 
/my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail 
on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not 
create directory for more than one level. 

This does not happen in unit test covered with SPARK-20236 with local file 
system.

We are proposing a fix. When cleaning current partition dir 
/my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails 
(because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to 
create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not 
exist) so the following rename op can succeed.





## How was this patch tested?

We have tested this patch on our production cluster and it fixed the problem

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fangshil/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20931


commit da63c17d7ae7fbf04cc474d946d61a098b3e1ade
Author: Fangshi Li 
Date:   2018-03-28T04:25:54Z

Spark writer dynamic partition overwrite mode may fail to write output on 
multi level partition




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org