Repository: spark
Updated Branches:
  refs/heads/branch-2.2 8a4e7dd89 -> 0d3f1667e


[SPARK-21549][CORE] Respect OutputFormats with no output directory provided

## What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths 
provided.
The examples of such formats are [Cassandra 
OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java),
 [Aerospike 
OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java),
 etc. which do not have an ability to rollback the results written to an 
external systems on job failure.

Provided output directory is required by Spark to allows files to be committed 
to an absolute output location, that is not the case for output formats which 
write data to external systems.

This pull request prevents accessing `absPathStagingDir` method that causes the 
error described in SPARK-21549 unless there are files to rename in 
`addedAbsPathFiles`.

## How was this patch tested?

Unit tests

Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>

Closes #19294 from szhem/SPARK-21549-abs-output-commits.

(cherry picked from commit 2030f19511f656e9534f3fd692e622e45f9a074e)
Signed-off-by: Mridul Muralidharan <mri...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d3f1667
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d3f1667
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d3f1667

Branch: refs/heads/branch-2.2
Commit: 0d3f1667e92debbe2c3d412ea8e9c6eba752ef53
Parents: 8a4e7dd
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Authored: Fri Oct 6 20:43:53 2017 -0700
Committer: Mridul Muralidharan <mri...@gmail.com>
Committed: Fri Oct 6 20:44:47 2017 -0700

----------------------------------------------------------------------
 .../io/HadoopMapReduceCommitProtocol.scala      | 28 +++++++++++++----
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 33 +++++++++++++++++++-
 2 files changed, 54 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d3f1667/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 22e2679..4d42a66 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -35,6 +35,9 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
  * (from the newer mapreduce API, not the old mapred API).
  *
  * Unlike Hadoop's OutputCommitter, this implementation is serializable.
+ *
+ * @param jobId the job's or stage's id
+ * @param path the job's output path, or null if committer acts as a noop
  */
 class HadoopMapReduceCommitProtocol(jobId: String, path: String)
   extends FileCommitProtocol with Serializable with Logging {
@@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
    */
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
 
+  /**
+   * Checks whether there are files to be committed to an absolute output 
location.
+   *
+   * As committing and aborting a job occurs on driver, where 
`addedAbsPathFiles` is always null,
+   * it is necessary to check whether the output path is specified. Output 
path may not be required
+   * for committers not writing to distributed file systems.
+   */
+  private def hasAbsPathFiles: Boolean = path != null
+
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
     val format = context.getOutputFormatClass.newInstance()
     // If OutputFormat is Configurable, we should set conf to it.
@@ -129,17 +141,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
     val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
       .foldLeft(Map[String, String]())(_ ++ _)
     logDebug(s"Committing files staged for absolute locations $filesToMove")
-    val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-    for ((src, dst) <- filesToMove) {
-      fs.rename(new Path(src), new Path(dst))
+    if (hasAbsPathFiles) {
+      val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+      for ((src, dst) <- filesToMove) {
+        fs.rename(new Path(src), new Path(dst))
+      }
+      fs.delete(absPathStagingDir, true)
     }
-    fs.delete(absPathStagingDir, true)
   }
 
   override def abortJob(jobContext: JobContext): Unit = {
     committer.abortJob(jobContext, JobStatus.State.FAILED)
-    val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-    fs.delete(absPathStagingDir, true)
+    if (hasAbsPathFiles) {
+      val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+      fs.delete(absPathStagingDir, true)
+    }
   }
 
   override def setupTask(taskContext: TaskAttemptContext): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0d3f1667/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 02df157..b0175e6 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -26,7 +26,7 @@ import 
org.apache.commons.math3.distribution.{BinomialDistribution, PoissonDistr
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.mapred._
-import org.apache.hadoop.mapreduce.{JobContext => NewJobContext,
+import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext,
   OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
   RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
 import org.apache.hadoop.util.Progressable
@@ -568,6 +568,37 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
     assert(FakeWriterWithCallback.exception.getMessage contains "failed to 
write")
   }
 
+  test("saveAsNewAPIHadoopDataset should respect empty output directory when " 
+
+    "there are no files to be committed to an absolute output location") {
+    val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+    val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
+    job.setOutputKeyClass(classOf[Integer])
+    job.setOutputValueClass(classOf[Integer])
+    job.setOutputFormatClass(classOf[NewFakeFormat])
+    val jobConfiguration = job.getConfiguration
+
+    // just test that the job does not fail with
+    // java.lang.IllegalArgumentException: Can not create a Path from a null 
string
+    pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
+  }
+
+  test("saveAsHadoopDataset should respect empty output directory when " +
+    "there are no files to be committed to an absolute output location") {
+    val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+    val conf = new JobConf()
+    conf.setOutputKeyClass(classOf[Integer])
+    conf.setOutputValueClass(classOf[Integer])
+    conf.setOutputFormat(classOf[FakeOutputFormat])
+    conf.setOutputCommitter(classOf[FakeOutputCommitter])
+
+    FakeOutputCommitter.ran = false
+    pairs.saveAsHadoopDataset(conf)
+
+    assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
+  }
+
   test("lookup") {
     val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to