This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2153b31  [SPARK-26892][CORE] Fix saveAsTextFile throws 
NullPointerException when null row present
2153b31 is described below

commit 2153b316bda119ede8c80ceda522027a6581031b
Author: liupengcheng <liupengch...@xiaomi.com>
AuthorDate: Wed Feb 20 16:42:55 2019 -0600

    [SPARK-26892][CORE] Fix saveAsTextFile throws NullPointerException when 
null row present
    
    ## What changes were proposed in this pull request?
    
    Currently, RDD.saveAsTextFile may throw NullPointerException then null row 
is present.
    ```
    scala> sc.parallelize(Seq(1,null),1).saveAsTextFile("/tmp/foobar.dat")
    19/02/15 21:39:17 ERROR Utils: Aborting task
    java.lang.NullPointerException
    at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$3(RDD.scala:1510)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at 
org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:129)
    at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1352)
    at 
org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:127)
    at 
org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1318)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    ```
    
    This PR write "Null" for null row to avoid NPE and fix it.
    
    ## How was this patch tested?
    
    NA
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #23799 from 
liupc/Fix-saveAsTextFile-throws-NullPointerException-when-null-row-present.
    
    Lead-authored-by: liupengcheng <liupengch...@xiaomi.com>
    Co-authored-by: Liupengcheng <liupengch...@xiaomi.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../org/apache/spark/rdd/PairRDDFunctions.scala    |  2 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 +++-------------------
 .../test/scala/org/apache/spark/FileSuite.scala    |  8 ++++++
 3 files changed, 13 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 8b5f9bb..7f8064f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1012,7 +1012,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
       codec: Class[_ <: CompressionCodec]): Unit = self.withScope {
     saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
-      new JobConf(self.context.hadoopConfiguration), Some(codec))
+      new JobConf(self.context.hadoopConfiguration), Option(codec))
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index d39f418..1b67e99 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1492,45 +1492,21 @@ abstract class RDD[T: ClassTag](
    * Save this RDD as a text file, using string representations of elements.
    */
   def saveAsTextFile(path: String): Unit = withScope {
-    // https://issues.apache.org/jira/browse/SPARK-2075
-    //
-    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot 
find an implicit
-    // Ordering for it and will use the default `null`. However, it's a 
`Comparable[NullWritable]`
-    // in Hadoop 2.+, so the compiler will call the implicit 
`Ordering.ordered` method to create an
-    // Ordering for `NullWritable`. That's why the compiler will generate 
different anonymous
-    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
-    //
-    // Therefore, here we provide an explicit Ordering `null` to make sure the 
compiler generate
-    // same bytecodes for `saveAsTextFile`.
-    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
-    val textClassTag = implicitly[ClassTag[Text]]
-    val r = this.mapPartitions { iter =>
-      val text = new Text()
-      iter.map { x =>
-        text.set(x.toString)
-        (NullWritable.get(), text)
-      }
-    }
-    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
-      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
+    saveAsTextFile(path, null)
   }
 
   /**
    * Save this RDD as a compressed text file, using string representations of 
elements.
    */
   def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit 
= withScope {
-    // https://issues.apache.org/jira/browse/SPARK-2075
-    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
-    val textClassTag = implicitly[ClassTag[Text]]
-    val r = this.mapPartitions { iter =>
+    this.mapPartitions { iter =>
       val text = new Text()
       iter.map { x =>
+        require(x != null, "text files do not allow null rows")
         text.set(x.toString)
         (NullWritable.get(), text)
       }
-    }
-    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
-      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
+    }.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 983a791..c540d7b 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -94,6 +94,14 @@ class FileSuite extends SparkFunSuite with LocalSparkContext 
{
     assert(compressedFile.length < normalFile.length)
   }
 
+  test("text files do not allow null rows") {
+    sc = new SparkContext("local", "test")
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD((1 to 100) ++ Seq(null))
+    val exception = intercept[SparkException](nums.saveAsTextFile(outputDir))
+    assert(Utils.exceptionString(exception).contains("text files do not allow 
null rows"))
+  }
+
   test("SequenceFiles") {
     sc = new SparkContext("local", "test")
     val outputDir = new File(tempDir, "output").getAbsolutePath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to