Repository: spark Updated Branches: refs/heads/branch-2.2 acbad83ec -> 6b6761e8f
[SPARK-21549][CORE] Respect OutputFormats with no/invalid output directory provided ## What changes were proposed in this pull request? PR #19294 added support for null's - but spark 2.1 handled other error cases where path argument can be invalid. Namely: * empty string * URI parse exception while creating Path This is resubmission of PR #19487, which I messed up while updating my repo. ## How was this patch tested? Enhanced test to cover new support added. Author: Mridul Muralidharan <mri...@gmail.com> Closes #19497 from mridulm/master. (cherry picked from commit 13c1559587d0eb533c94f5a492390f81b048b347) Signed-off-by: Mridul Muralidharan <mri...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b6761e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b6761e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b6761e8 Branch: refs/heads/branch-2.2 Commit: 6b6761e8fe1e8d4c7d0a3c3b35c55fe388220234 Parents: acbad83 Author: Mridul Muralidharan <mri...@gmail.com> Authored: Sun Oct 15 18:40:53 2017 -0700 Committer: Mridul Muralidharan <mri...@gmail.com> Committed: Sun Oct 15 18:41:47 2017 -0700 ---------------------------------------------------------------------- .../io/HadoopMapReduceCommitProtocol.scala | 24 ++++++++------- .../spark/rdd/PairRDDFunctionsSuite.scala | 31 ++++++++++++++------ 2 files changed, 35 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6b6761e8/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 4d42a66..bc777eb 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -20,6 +20,7 @@ package org.apache.spark.internal.io import java.util.{Date, UUID} import scala.collection.mutable +import scala.util.Try import org.apache.hadoop.conf.Configurable import org.apache.hadoop.fs.Path @@ -48,6 +49,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) @transient private var committer: OutputCommitter = _ /** + * Checks whether there are files to be committed to a valid output location. + * + * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, + * it is necessary to check whether a valid output path is specified. + * [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for + * committers not writing to distributed file systems. + */ + private val hasValidPath = Try { new Path(path) }.isSuccess + + /** * Tracks files staged by this task for absolute output paths. These outputs are not managed by * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. * @@ -60,15 +71,6 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) - /** - * Checks whether there are files to be committed to an absolute output location. - * - * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, - * it is necessary to check whether the output path is specified. Output path may not be required - * for committers not writing to distributed file systems. - */ - private def hasAbsPathFiles: Boolean = path != null - protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -141,7 +143,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") - if (hasAbsPathFiles) { + if (hasValidPath) { val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) for ((src, dst) <- filesToMove) { fs.rename(new Path(src), new Path(dst)) @@ -152,7 +154,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def abortJob(jobContext: JobContext): Unit = { committer.abortJob(jobContext, JobStatus.State.FAILED) - if (hasAbsPathFiles) { + if (hasValidPath) { val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) fs.delete(absPathStagingDir, true) } http://git-wip-us.apache.org/repos/asf/spark/blob/6b6761e8/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index b0175e6..2820c15 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -568,21 +568,34 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assert(FakeWriterWithCallback.exception.getMessage contains "failed to write") } - test("saveAsNewAPIHadoopDataset should respect empty output directory when " + + test("saveAsNewAPIHadoopDataset should support invalid output paths when " + "there are no files to be committed to an absolute output location") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) - val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) - job.setOutputKeyClass(classOf[Integer]) - job.setOutputValueClass(classOf[Integer]) - job.setOutputFormatClass(classOf[NewFakeFormat]) - val jobConfiguration = job.getConfiguration + def saveRddWithPath(path: String): Unit = { + val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) + job.setOutputKeyClass(classOf[Integer]) + job.setOutputValueClass(classOf[Integer]) + job.setOutputFormatClass(classOf[NewFakeFormat]) + if (null != path) { + job.getConfiguration.set("mapred.output.dir", path) + } else { + job.getConfiguration.unset("mapred.output.dir") + } + val jobConfiguration = job.getConfiguration + + // just test that the job does not fail with java.lang.IllegalArgumentException. + pairs.saveAsNewAPIHadoopDataset(jobConfiguration) + } - // just test that the job does not fail with - // java.lang.IllegalArgumentException: Can not create a Path from a null string - pairs.saveAsNewAPIHadoopDataset(jobConfiguration) + saveRddWithPath(null) + saveRddWithPath("") + saveRddWithPath("::invalid::") } + // In spark 2.1, only null was supported - not other invalid paths. + // org.apache.hadoop.mapred.FileOutputFormat.getOutputPath fails with IllegalArgumentException + // for non-null invalid paths. test("saveAsHadoopDataset should respect empty output directory when " + "there are no files to be committed to an absolute output location") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org