[ https://issues.apache.org/jira/browse/SPARK-35106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-35106: ------------------------------------ Assignee: Apache Spark > HadoopMapReduceCommitProtocol performs bad rename when dynamic partition > overwrite is used > ------------------------------------------------------------------------------------------ > > Key: SPARK-35106 > URL: https://issues.apache.org/jira/browse/SPARK-35106 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core > Affects Versions: 3.1.1 > Reporter: Erik Krogen > Assignee: Apache Spark > Priority: Major > > Recently when evaluating the code in > {{HadoopMapReduceCommitProtocol#commitJob}}, I found some bad codepath under > the {{dynamicPartitionOverwrite == true}} scenario: > {code:language=scala} > # BLOCK 1 > if (dynamicPartitionOverwrite) { > val absPartitionPaths = filesToMove.values.map(new > Path(_).getParent).toSet > logDebug(s"Clean up absolute partition directories for overwriting: > $absPartitionPaths") > absPartitionPaths.foreach(fs.delete(_, true)) > } > # BLOCK 2 > for ((src, dst) <- filesToMove) { > fs.rename(new Path(src), new Path(dst)) > } > # BLOCK 3 > if (dynamicPartitionOverwrite) { > val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) > logDebug(s"Clean up default partition directories for overwriting: > $partitionPaths") > for (part <- partitionPaths) { > val finalPartPath = new Path(path, part) > if (!fs.delete(finalPartPath, true) && > !fs.exists(finalPartPath.getParent)) { > // According to the official hadoop FileSystem API spec, delete > op should assume > // the destination is no longer present regardless of return > value, thus we do not > // need to double check if finalPartPath exists before rename. > // Also in our case, based on the spec, delete returns false only > when finalPartPath > // does not exist. When this happens, we need to take action if > parent of finalPartPath > // also does not exist(e.g. the scenario described on > SPARK-23815), because > // FileSystem API spec on rename op says the rename > dest(finalPartPath) must have > // a parent that exists, otherwise we may get unexpected result > on the rename. > fs.mkdirs(finalPartPath.getParent) > } > fs.rename(new Path(stagingDir, part), finalPartPath) > } > } > {code} > Assuming {{dynamicPartitionOverwrite == true}}, we have the following > sequence of events: > # Block 1 deletes all parent directories of {{filesToMove.values}} > # Block 2 attempts to rename all {{filesToMove.keys}} to > {{filesToMove.values}} > # Block 3 does directory-level renames to place files into their final > locations > All renames in Block 2 will always fail, since all parent directories of > {{filesToMove.values}} were just deleted in Block 1. Under a normal HDFS > scenario, the contract of {{fs.rename}} is to return {{false}} under such a > failure scenario, as opposed to throwing an exception. There is a separate > issue here that Block 2 should probably be checking for those {{false}} > return values -- but this allows for {{dynamicPartitionOverwrite}} to "work", > albeit with a bunch of failed renames in the middle. Really, we should only > run Block 2 in the {{dynamicPartitionOverwrite == false}} case, and > consolidate Blocks 1 and 3 to run in the {{true}} case. > We discovered this issue when testing against a {{FileSystem}} implementation > which was throwing an exception for this failed rename scenario instead of > returning false, escalating the silent/ignored rename failures into actual > failures. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org