Repository: spark Updated Branches: refs/heads/master 95abbe537 -> 4b079dc39
[SPARK-16613][CORE] RDD.pipe returns values for empty partitions ## What changes were proposed in this pull request? Document RDD.pipe semantics; don't execute process for empty input partitions. Note this includes the fix in https://github.com/apache/spark/pull/14256 because it's necessary to even test this. One or the other will merge the fix. ## How was this patch tested? Jenkins tests including new test. Author: Sean Owen <so...@cloudera.com> Closes #14260 from srowen/SPARK-16613. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b079dc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b079dc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b079dc3 Branch: refs/heads/master Commit: 4b079dc3964dbe0f4d7839d39512d0400122b520 Parents: 95abbe5 Author: Sean Owen <so...@cloudera.com> Authored: Wed Jul 20 09:48:52 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Jul 20 09:48:52 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 +++++++- core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4b079dc3/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0804cde..a4905dd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -714,7 +714,13 @@ abstract class RDD[T: ClassTag]( } /** - * Return an RDD created by piping elements to a forked external process. + * Return an RDD created by piping elements to a forked external process. The resulting RDD + * is computed by executing the given process once per partition. All elements + * of each input partition are written to a process's stdin as lines of input separated + * by a newline. The resulting partition consists of the process's stdout output, with + * each line of stdout resulting in one element of the output partition. A process is invoked + * even for empty partitions. + * * The print behavior can be customized by providing two functions. * * @param command command to run in forked process. http://git-wip-us.apache.org/repos/asf/spark/blob/4b079dc3/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 5d56fc1..f8d523f 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -138,6 +138,14 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("pipe with empty partition") { + val data = sc.parallelize(Seq("foo", "bing"), 8) + val piped = data.pipe("wc -c") + assert(piped.count == 8) + val charCounts = piped.map(_.trim.toInt).collect().toSet + assert(Set(0, 4, 5) == charCounts) + } + test("pipe with env variable") { if (testCommandAvailable("printenv")) { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org