Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16254#discussion_r92137332
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala ---
    @@ -32,177 +32,172 @@ import org.apache.spark._
     import org.apache.spark.util.Utils
     
     class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
    +  val envCommand = if (Utils.isWindows) {
    +    "cmd.exe /C set"
    +  } else {
    +    "printenv"
    +  }
     
       test("basic pipe") {
    -    if (testCommandAvailable("cat")) {
    -      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    +    assume(testCommandAvailable("cat"))
    +    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     
    -      val piped = nums.pipe(Seq("cat"))
    +    val piped = nums.pipe(Seq("cat"))
     
    -      val c = piped.collect()
    -      assert(c.size === 4)
    -      assert(c(0) === "1")
    -      assert(c(1) === "2")
    -      assert(c(2) === "3")
    -      assert(c(3) === "4")
    -    } else {
    -      assert(true)
    -    }
    +    val c = piped.collect()
    +    assert(c.size === 4)
    +    assert(c(0) === "1")
    +    assert(c(1) === "2")
    +    assert(c(2) === "3")
    +    assert(c(3) === "4")
       }
     
       test("basic pipe with tokenization") {
    -    if (testCommandAvailable("wc")) {
    -      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    -
    -      // verify that both RDD.pipe(command: String) and RDD.pipe(command: 
String, env) work good
    -      for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, 
String]()))) {
    -        val c = piped.collect()
    -        assert(c.size === 2)
    -        assert(c(0).trim === "2")
    -        assert(c(1).trim === "2")
    -      }
    -    } else {
    -      assert(true)
    +    assume(testCommandAvailable("wc"))
    +    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    +
    +    // verify that both RDD.pipe(command: String) and RDD.pipe(command: 
String, env) work good
    +    for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, 
String]()))) {
    +      val c = piped.collect()
    +      assert(c.size === 2)
    +      assert(c(0).trim === "2")
    +      assert(c(1).trim === "2")
         }
       }
     
       test("failure in iterating over pipe input") {
    -    if (testCommandAvailable("cat")) {
    -      val nums =
    -        sc.makeRDD(Array(1, 2, 3, 4), 2)
    -          .mapPartitionsWithIndex((index, iterator) => {
    -            new Iterator[Int] {
    -              def hasNext = true
    -              def next() = {
    -                throw new SparkException("Exception to simulate bad 
scenario")
    -              }
    -            }
    -          })
    -
    -      val piped = nums.pipe(Seq("cat"))
    -
    -      intercept[SparkException] {
    -        piped.collect()
    -      }
    +    assume(testCommandAvailable("cat"))
    +    val nums =
    +      sc.makeRDD(Array(1, 2, 3, 4), 2)
    +        .mapPartitionsWithIndex((index, iterator) => {
    +        new Iterator[Int] {
    +          def hasNext = true
    +          def next() = {
    +            throw new SparkException("Exception to simulate bad scenario")
    +          }
    +        }
    +      })
    +
    +    val piped = nums.pipe(Seq("cat"))
    +
    +    intercept[SparkException] {
    +      piped.collect()
         }
       }
     
       test("advanced pipe") {
    -    if (testCommandAvailable("cat")) {
    -      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    -      val bl = sc.broadcast(List("0"))
    -
    -      val piped = nums.pipe(Seq("cat"),
    +    assume(testCommandAvailable("cat"))
    +    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    +    val bl = sc.broadcast(List("0"))
    +
    +    val piped = nums.pipe(Seq("cat"),
    +      Map[String, String](),
    +      (f: String => Unit) => {
    +        bl.value.foreach(f); f("\u0001")
    +      },
    +      (i: Int, f: String => Unit) => f(i + "_"))
    +
    +    val c = piped.collect()
    +
    +    assert(c.size === 8)
    +    assert(c(0) === "0")
    +    assert(c(1) === "\u0001")
    +    assert(c(2) === "1_")
    +    assert(c(3) === "2_")
    +    assert(c(4) === "0")
    +    assert(c(5) === "\u0001")
    +    assert(c(6) === "3_")
    +    assert(c(7) === "4_")
    +
    +    val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
    +    val d = nums1.groupBy(str => str.split("\t")(0)).
    +      pipe(Seq("cat"),
             Map[String, String](),
             (f: String => Unit) => {
               bl.value.foreach(f); f("\u0001")
             },
    -        (i: Int, f: String => Unit) => f(i + "_"))
    -
    -      val c = piped.collect()
    -
    -      assert(c.size === 8)
    -      assert(c(0) === "0")
    -      assert(c(1) === "\u0001")
    -      assert(c(2) === "1_")
    -      assert(c(3) === "2_")
    -      assert(c(4) === "0")
    -      assert(c(5) === "\u0001")
    -      assert(c(6) === "3_")
    -      assert(c(7) === "4_")
    -
    -      val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
    -      val d = nums1.groupBy(str => str.split("\t")(0)).
    -        pipe(Seq("cat"),
    -          Map[String, String](),
    -          (f: String => Unit) => {
    -            bl.value.foreach(f); f("\u0001")
    -          },
    -          (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
    -            for (e <- i._2) {
    -              f(e + "_")
    -            }
    -          }).collect()
    -      assert(d.size === 8)
    -      assert(d(0) === "0")
    -      assert(d(1) === "\u0001")
    -      assert(d(2) === "b\t2_")
    -      assert(d(3) === "b\t4_")
    -      assert(d(4) === "0")
    -      assert(d(5) === "\u0001")
    -      assert(d(6) === "a\t1_")
    -      assert(d(7) === "a\t3_")
    -    } else {
    -      assert(true)
    -    }
    +        (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
    +          for (e <- i._2) {
    +            f(e + "_")
    +          }
    +        }).collect()
    +    assert(d.size === 8)
    +    assert(d(0) === "0")
    +    assert(d(1) === "\u0001")
    +    assert(d(2) === "b\t2_")
    +    assert(d(3) === "b\t4_")
    +    assert(d(4) === "0")
    +    assert(d(5) === "\u0001")
    +    assert(d(6) === "a\t1_")
    +    assert(d(7) === "a\t3_")
       }
     
       test("pipe with empty partition") {
         val data = sc.parallelize(Seq("foo", "bing"), 8)
         val piped = data.pipe("wc -c")
         assert(piped.count == 8)
         val charCounts = piped.map(_.trim.toInt).collect().toSet
    -    assert(Set(0, 4, 5) == charCounts)
    +    val expected = if (Utils.isWindows) {
    +      // Note that newline character on Windows is \r\n which are two.
    +      Set(0, 5, 6)
    +    } else {
    +      Set(0, 4, 5)
    +    }
    +    assert(expected == charCounts)
       }
     
       test("pipe with env variable") {
    -    if (testCommandAvailable("printenv")) {
    -      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    -      val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), 
Map("MY_TEST_ENV" -> "LALALA"))
    -      val c = piped.collect()
    -      assert(c.size === 2)
    -      assert(c(0) === "LALALA")
    -      assert(c(1) === "LALALA")
    -    } else {
    -      assert(true)
    -    }
    +    assume(testCommandAvailable(envCommand))
    +    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    +    val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> 
"LALALA"))
    +    val c = piped.collect()
    +    assert(c.length === 2)
    +    // On Windows, `cmd.exe /C set` is used which prints out it as 
`varname=value` format
    +    // whereas `printenv` usually prints out `value`. So, `varname=` is 
stripped here for both.
    +    assert(c(0).stripPrefix("MY_TEST_ENV=") === "LALALA")
    +    assert(c(1).stripPrefix("MY_TEST_ENV=") === "LALALA")
       }
     
       test("pipe with process which cannot be launched due to bad command") {
    -    if (!testCommandAvailable("some_nonexistent_command")) {
    -      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
    -      val command = Seq("some_nonexistent_command")
    -      val piped = nums.pipe(command)
    -      val exception = intercept[SparkException] {
    -        piped.collect()
    -      }
    -      assert(exception.getMessage.contains(command.mkString(" ")))
    +    assume(!testCommandAvailable("some_nonexistent_command"))
    --- End diff --
    
    Oh, I see. That is true!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to