Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2c74b6d73 -> 6ca1d941b


[SPARK-16620][CORE] Add back the tokenization process in `RDD.pipe(command: 
String)`

## What changes were proposed in this pull request?

Currently `RDD.pipe(command: String)`:
- works only when the command is specified without any options, such as 
`RDD.pipe("wc")`
- does NOT work when the command is specified with some options, such as 
`RDD.pipe("wc -l")`

This is a regression from Spark 1.6.

This patch adds back the tokenization process in `RDD.pipe(command: String)` to 
fix this regression.

## How was this patch tested?
Added a test which:
- would pass in `1.6`
- _[prior to this patch]_ would fail in `master`
- _[after this patch]_ would pass in `master`

Author: Liwei Lin <lwl...@gmail.com>

Closes #14256 from lw-lin/rdd-pipe.

(cherry picked from commit 0bd76e872b60cb80295fc12654e370cf22390056)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ca1d941
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ca1d941
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ca1d941

Branch: refs/heads/branch-2.0
Commit: 6ca1d941b0b417f10533ab3506a9f3cf60e6a7fe
Parents: 2c74b6d
Author: Liwei Lin <lwl...@gmail.com>
Authored: Tue Jul 19 10:24:48 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Jul 19 10:25:24 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala  |  8 ++++++--
 .../scala/org/apache/spark/rdd/PipedRDDSuite.scala  | 16 ++++++++++++++++
 2 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ca1d941/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b7a5b22..0804cde 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -699,14 +699,18 @@ abstract class RDD[T: ClassTag](
    * Return an RDD created by piping elements to a forked external process.
    */
   def pipe(command: String): RDD[String] = withScope {
-    pipe(command)
+    // Similar to Runtime.exec(), if we are given a single string, split it 
into words
+    // using a standard StringTokenizer (i.e. by spaces)
+    pipe(PipedRDD.tokenize(command))
   }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
   def pipe(command: String, env: Map[String, String]): RDD[String] = withScope 
{
-    pipe(command, env)
+    // Similar to Runtime.exec(), if we are given a single string, split it 
into words
+    // using a standard StringTokenizer (i.e. by spaces)
+    pipe(PipedRDD.tokenize(command), env)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca1d941/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 27cfdc7..5d56fc1 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -51,6 +51,22 @@ class PipedRDDSuite extends SparkFunSuite with 
SharedSparkContext {
     }
   }
 
+  test("basic pipe with tokenization") {
+    if (testCommandAvailable("wc")) {
+      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+
+      // verify that both RDD.pipe(command: String) and RDD.pipe(command: 
String, env) work good
+      for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, 
String]()))) {
+        val c = piped.collect()
+        assert(c.size === 2)
+        assert(c(0).trim === "2")
+        assert(c(1).trim === "2")
+      }
+    } else {
+      assert(true)
+    }
+  }
+
   test("failure in iterating over pipe input") {
     if (testCommandAvailable("cat")) {
       val nums =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to