[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin closed the pull request at: https://github.com/apache/spark/pull/16276 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on the issue: https://github.com/apache/spark/pull/16276 @rxin Thx, I just deep dive into spark, I hope i can try to contribute more impactful things later ð . I will close this one. @srowen I tried out your idea that i created `TraversableRDDFunctions` file and `implicit def rddToTraversableRDDFunctions[U](rdd: RDD[TraversableRDDFunctions[U]])` inside RDD object. It's not very complex, but the problem is class RDD is **invariant** so that it's hard to make this method generic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on the issue: https://github.com/apache/spark/pull/16276 @srowen So any problems with my current implementation, i mean use `implicit asTraversable: T => TraversableOnce[U]`. Because I refer to `flatten` implementation in scala source code: https://github.com/scala/scala/blob/05016d9035ab9b1c866bd9f12fdd0491f1ea0cbb/src/library/scala/collection/generic/GenericTraversableTemplate.scala#L169 and if user call flatten on invalid rdd, it will just error out: ``` scala> val rdd = sc.parallelize(Seq(1,2,3)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 scala> rdd.flatten :27: error: No implicit view available from Int => scala.collection.TraversableOnce[U]. rdd.flatten ^ ``` same as scala ``` scala> val l = List(1,2,3) l: List[Int] = List(1, 2, 3) scala> l.flatten :13: error: No implicit view available from Int => scala.collection.GenTraversableOnce[B]. l.flatten ^ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92550699 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag]( } /** +* Return a new RDD by flattening all elements from RDD with traversable elements +*/ + def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { --- End diff -- @srowen I think i figured out a simpler way: ``` def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => { var newIter: Iterator[U] = Iterator.empty for (x <- iter) newIter ++= asTraversable(x) newIter }) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92546374 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag]( } /** +* Return a new RDD by flattening all elements from RDD with traversable elements +*/ + def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { --- End diff -- Hi @srowen, thx for your suggestion. I have one way to use scala flatMap as follows: ``` def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { val f = (x: T) => asTraversable(x) val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(f)) } ``` Or i implement the logic by myself: ``` def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => new Iterator[U] { private val empty = Iterator.empty private var cur: Iterator[U] = empty private def nextCur() { cur = asTraversable(iter.next).toIterator } def hasNext: Boolean = { while (!cur.hasNext) { if (!iter.hasNext) return false nextCur() } true } def next(): U = (if (hasNext) cur else empty).next() }) } ``` ref: https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/Iterator.scala#L432 Which one do you think is better? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92531327 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -88,6 +88,13 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("flatten") { +val nums = sc.makeRDD(Array(Array(1, 2, 3), Array(4, 5), Array(6)), 2) +assert(nums.flatten.collect().toList === List(1, 2, 3, 4, 5, 6)) +val strs = sc.makeRDD(Array(Array("a", "b", "c"), Array("d", "e"), Array("f")), 2) +assert(strs.flatten.collect().toList === List("a", "b", "c", "d", "e", "f")) --- End diff -- Thx, i will move test codes into "basic operations". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on the issue: https://github.com/apache/spark/pull/16276 cc @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
GitHub user linbojin opened a pull request: https://github.com/apache/spark/pull/16276 [SPARK-18855][CORE] Add RDD flatten function ## What changes were proposed in this pull request? Added a new flatten function for RDD. ## How was this patch tested? Unit tests inside RDDSuite and manually tests: ``` scala> val rdd = sc.makeRDD(List(List(1, 2, 3), List(4, 5), List(6))) rdd: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[0] at makeRDD at :24 scala> rdd.flatten.collect res0: Array[Int] = Array(1, 2, 3, 4, 5, 6) ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/linbojin/spark SPARK-18855-add-rdd-flatten Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16276 commit 2c0903ac07367cf203e4b1ed6bf4ac1894976ec9 Author: linbojin <linbojin...@gmail.com> Date: 2016-12-14T06:04:48Z add RDD flatten function and tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16132: [MINOR] [README] Correct Markdown link inside rea...
GitHub user linbojin opened a pull request: https://github.com/apache/spark/pull/16132 [MINOR] [README] Correct Markdown link inside readme ## What changes were proposed in this pull request? "Useful Developer Tools" link inside [README.md](https://github.com/apache/spark/blob/master/README.md#building-spark) doesn't work on master branch. This pr corrects this Markdown link. ## How was this patch tested? [README.md](https://github.com/linbojin/spark/blob/fix-markdown-link-in-readme/README.md#building-spark) on this branch ![image](https://cloud.githubusercontent.com/assets/5894707/20864124/4c83499e-ba1e-11e6-9948-07b4627f516f.png) @srowen You can merge this pull request into a Git repository by running: $ git pull https://github.com/linbojin/spark fix-markdown-link-in-readme Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16132 commit a6c6dc75a6bd2b05f0619f5677b770d036487c87 Author: linbojin <linbojin...@gmail.com> Date: 2016-12-04T04:30:35Z correct Markdown link inside README --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/14645#discussion_r74737989 --- Diff: docs/quick-start.md --- @@ -40,7 +40,7 @@ RDDs have _[actions](programming-guide.html#actions)_, which return values, and {% highlight scala %} scala> textFile.count() // Number of items in this RDD -res0: Long = 126 +res0: Long = 99 --- End diff -- OK, i changed into 15. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/14645#discussion_r74737117 --- Diff: docs/quick-start.md --- @@ -40,7 +40,7 @@ RDDs have _[actions](programming-guide.html#actions)_, which return values, and {% highlight scala %} scala> textFile.count() // Number of items in this RDD -res0: Long = 126 +res0: Long = 99 --- End diff -- From http://spark.apache.org/docs/latest/quick-start.html without cache ![screen shot 2016-08-15 at 17 12 33](https://cloud.githubusercontent.com/assets/5894707/17660363/8693aeea-630b-11e6-93b2-09c438e8f77f.png) and with cache ![screen shot 2016-08-15 at 17 12 44](https://cloud.githubusercontent.com/assets/5894707/17660387/a58635ac-630b-11e6-8541-36003860ed5e.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/14645#discussion_r74733368 --- Diff: docs/quick-start.md --- @@ -40,7 +40,7 @@ RDDs have _[actions](programming-guide.html#actions)_, which return values, and {% highlight scala %} scala> textFile.count() // Number of items in this RDD -res0: Long = 126 +res0: Long = 99 --- End diff -- Ok, i will just comment these. But for the bug mentioned above (with and without cache, the results are not matched), do i need to fix? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14645: [MINOR] [DOC] Correct code snippet results in qui...
GitHub user linbojin opened a pull request: https://github.com/apache/spark/pull/14645 [MINOR] [DOC] Correct code snippet results in quick start documentation ## What changes were proposed in this pull request? As README.md file is updated over time. Some code snippet outputs are not correct based on new README.md file. For example: ``` scala> textFile.count() res0: Long = 126 ``` should be ``` scala> textFile.count() res0: Long = 99 ``` This pr is to correct these outputs so that new spark learners have a correct reference. Also, fixed a samll bug, inside current documentation, the outputs of linesWithSpark.count() without and with cache are different (one is 15 and the other is 19) ``` scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at :27 scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 ... scala> linesWithSpark.cache() res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at :27 scala> linesWithSpark.count() res8: Long = 19 ``` ## How was this patch tested? manual test: run `$ SKIP_API=1 jekyll serve --watch` You can merge this pull request into a Git repository by running: $ git pull https://github.com/linbojin/spark quick-start-documentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14645.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14645 commit f093e3a44a6447f619edd987bf30ee838899c578 Author: linbojin <linbojin...@gmail.com> Date: 2016-08-15T06:26:39Z correct result numbers inside quick start docs based on new README.md file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [MINOR][SQL][DOCS] Add notes of the determinis...
Github user linbojin commented on the pull request: https://github.com/apache/spark/pull/13087#issuecomment-221148743 @marmbrus @dongjoon-hyun I will add the detail description to the old SPARK-15282 JIRA issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15282][SQL] PushDownPredicate should no...
Github user linbojin commented on the pull request: https://github.com/apache/spark/pull/13087#issuecomment-220837854 @dongjoon-hyun @cloud-fan @marmbrus Thanks for your discussions about my reported issue: [SPARK-15282](https://issues.apache.org/jira/browse/SPARK-15282) Maybe I should describe more about our use cases. In our project, firstly we generated a dataframe with one column called "fileName" one column called "url", and then we use a udf function (used inside withColumn()) to download the files from the corresponding urls and filter out '{}' data before writing to hdfs: ```scala // df: DataFrame["fileName", "url"] val getDataUDF = udf((url: String) => { try { download data } catch { case e: Exception => "{}" } }) val df2 = df.withColumn("data", getDataUDF(df("url"))) .filter("data <> '{}'") df2.write.save("hdfs path") ``` Based on our logs, each file will be downloaded twice. As for the running time, the writing job with filter will be twice as without filter: ![screen shot 2016-05-22 at 22 19 24](https://cloud.githubusercontent.com/assets/5894707/15454461/c13c9918-206b-11e6-8901-1f473fbae3ca.png) ![screen shot 2016-05-22 at 22 18 02](https://cloud.githubusercontent.com/assets/5894707/15454462/c8d4c1a0-206b-11e6-88cc-8ad91d121b6e.png) Left is with `.filter("data <> '{}'")` and right is without `.filter("data <> '{}'")`. It can be imaged, when there are many urls or the files are very large, the reported issue will affect the performance a lot. Another problem is about data correctness. Because it's downloaded twice for each file, we came across some cases that the first downloading (getDataUDF) can get data (not '{}'), and the second downloading return '{}' because of certain connection exception. But i found the filter only worked on the first returned value so that spark will not remove this row but the value inside "data" column was '{}' which is the second returned value. Even after filter, we get the result dataframe df2 like the follows (files with '{}' data which should be removed): ``` fileName url data file1url1 sth filesurl2 `{}` ``` **So on the high level, we get '{}' data after filter out '{}' which is strange. The reason I think is that UDF function is executed twice when filter on new column created by withColumn, and two returned values are different: first one makes filter condition true and second one makes filter condition false. The dataframe will keep the second value which in fact should not appear after filter operation.** Finally, i removed the filter operation (filter out '{}' in downstream) because i think it may be not correct to filter on new column created by withColumn. For me, i agree with @cloud-fan and @thunterdb, we can just document this behavior of udfs and uses should avoid to use udfs in such way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15282][SQL] PushDownPredicate should no...
Github user linbojin commented on the pull request: https://github.com/apache/spark/pull/13087#issuecomment-220502400 Hi, @marmbrus Could you review this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org