Re: skip lines in spark
Good question, I am wondering too how it is possible to add a line number to distributed data. I thought it was a job for maptPartionsWithIndex, but it seems difficult. Something similar here : http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html#a995 Maybe at the file reader knowing it works on the first HDFS block, to count line numbers or something ? André On 2014-04-23 18:18, Chengi Liu wrote: Hi, What is the easiest way to skip first n lines in rdd?? I am not able to figure this one out? Thanks -- André Bois-Crettez Software Architect Big Data Developer http://www.kelkoo.com/ Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Re: skip lines in spark
If the first partition doesn't have enough records, then it may not drop enough lines. Try rddData.zipWithIndex().filter(_._2 = 10L).map(_._1) It might trigger a job. Best, Xiangrui On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote: Hi Chengi, If you just want to skip first n lines in RDD, you can do rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines } Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, What is the easiest way to skip first n lines in rdd?? I am not able to figure this one out? Thanks
Re: skip lines in spark
Sorry, I didn't realize that zipWithIndex() is not in v0.9.1. It is in the master branch and will be included in v1.0. It first counts number of records per partition and then assigns indices starting from 0. -Xiangrui On Wed, Apr 23, 2014 at 9:56 AM, Chengi Liu chengi.liu...@gmail.com wrote: Also, zipWithIndex() is not valid.. Did you meant zipParititions? On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu chengi.liu...@gmail.com wrote: Xiangrui, So, is it that full code suggestion is : val trigger = rddData.zipWithIndex().filter( _._2 = 10L).map(_._1) and then what DB Tsai recommended trigger.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines }) Is that the full operation.. What happens, if I have to drop so many records that the number exceeds partition 0.. ?? How do i handle that case? On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng men...@gmail.com wrote: If the first partition doesn't have enough records, then it may not drop enough lines. Try rddData.zipWithIndex().filter(_._2 = 10L).map(_._1) It might trigger a job. Best, Xiangrui On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote: Hi Chengi, If you just want to skip first n lines in RDD, you can do rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines } Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, What is the easiest way to skip first n lines in rdd?? I am not able to figure this one out? Thanks
Re: skip lines in spark
What I suggested will not work if # of records you want to drop is more than the data in first partition. In my use-case, I only drop the first couple lines, so I don't have this issue. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu chengi.liu...@gmail.com wrote: Xiangrui, So, is it that full code suggestion is : val trigger = rddData.zipWithIndex().filter( _._2 = 10L).map(_._1) and then what DB Tsai recommended trigger.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines }) Is that the full operation.. What happens, if I have to drop so many records that the number exceeds partition 0.. ?? How do i handle that case? On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng men...@gmail.com wrote: If the first partition doesn't have enough records, then it may not drop enough lines. Try rddData.zipWithIndex().filter(_._2 = 10L).map(_._1) It might trigger a job. Best, Xiangrui On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote: Hi Chengi, If you just want to skip first n lines in RDD, you can do rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines } Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, What is the easiest way to skip first n lines in rdd?? I am not able to figure this one out? Thanks