Repository: spark
Updated Branches:
  refs/heads/branch-2.0 83b957e6a -> b177e082f


[SPARK-16613][CORE] RDD.pipe returns values for empty partitions

## What changes were proposed in this pull request?

Document RDD.pipe semantics; don't execute process for empty input partitions.

Note this includes the fix in https://github.com/apache/spark/pull/14256 
because it's necessary to even test this. One or the other will merge the fix.

## How was this patch tested?

Jenkins tests including new test.

Author: Sean Owen <so...@cloudera.com>

Closes #14260 from srowen/SPARK-16613.

(cherry picked from commit 4b079dc3964dbe0f4d7839d39512d0400122b520)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b177e082
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b177e082
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b177e082

Branch: refs/heads/branch-2.0
Commit: b177e082f196d6d06247d801a8441929a2ff93bf
Parents: 83b957e
Author: Sean Owen <so...@cloudera.com>
Authored: Wed Jul 20 09:48:52 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jul 20 09:49:03 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala           | 8 +++++++-
 core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 8 ++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b177e082/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0804cde..a4905dd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -714,7 +714,13 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
-   * Return an RDD created by piping elements to a forked external process.
+   * Return an RDD created by piping elements to a forked external process. 
The resulting RDD
+   * is computed by executing the given process once per partition. All 
elements
+   * of each input partition are written to a process's stdin as lines of 
input separated
+   * by a newline. The resulting partition consists of the process's stdout 
output, with
+   * each line of stdout resulting in one element of the output partition. A 
process is invoked
+   * even for empty partitions.
+   *
    * The print behavior can be customized by providing two functions.
    *
    * @param command command to run in forked process.

http://git-wip-us.apache.org/repos/asf/spark/blob/b177e082/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 5d56fc1..f8d523f 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -138,6 +138,14 @@ class PipedRDDSuite extends SparkFunSuite with 
SharedSparkContext {
     }
   }
 
+  test("pipe with empty partition") {
+    val data = sc.parallelize(Seq("foo", "bing"), 8)
+    val piped = data.pipe("wc -c")
+    assert(piped.count == 8)
+    val charCounts = piped.map(_.trim.toInt).collect().toSet
+    assert(Set(0, 4, 5) == charCounts)
+  }
+
   test("pipe with env variable") {
     if (testCommandAvailable("printenv")) {
       val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to