Re: skip lines in spark

2014-04-23 Thread Andre Bois-Crettez

Good question, I am wondering too how it is possible to add a line
number to distributed data.

I thought it was a job for maptPartionsWithIndex, but it seems difficult.
Something similar here :
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html#a995

Maybe at the file reader knowing it works on the first HDFS block, to
count line numbers or something ?

André

On 2014-04-23 18:18, Chengi Liu wrote:

Hi,
  What is the easiest way to skip first n lines in rdd??
I am not able to figure this one out?
Thanks



--
André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Re: skip lines in spark

2014-04-23 Thread Xiangrui Meng
If the first partition doesn't have enough records, then it may not
drop enough lines. Try

rddData.zipWithIndex().filter(_._2 = 10L).map(_._1)

It might trigger a job.

Best,
Xiangrui

On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote:
 Hi Chengi,

 If you just want to skip first n lines in RDD, you can do

 rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
 = {
   if (partitionIdx == 0) {
 lines.drop(n)
   }
   lines
 }


 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com wrote:

 Hi,
   What is the easiest way to skip first n lines in rdd??
 I am not able to figure this one out?
 Thanks




Re: skip lines in spark

2014-04-23 Thread Xiangrui Meng
Sorry, I didn't realize that zipWithIndex() is not in v0.9.1. It is in
the master branch and will be included in v1.0. It first counts number
of records per partition and then assigns indices starting from 0.
-Xiangrui

On Wed, Apr 23, 2014 at 9:56 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 Also, zipWithIndex() is not valid.. Did you meant zipParititions?


 On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu chengi.liu...@gmail.com wrote:

 Xiangrui,
   So, is it that full code suggestion is :
 val trigger = rddData.zipWithIndex().filter(
 _._2 = 10L).map(_._1)

 and then what DB Tsai recommended
 trigger.mapPartitionsWithIndex((partitionIdx: Int, lines:
 Iterator[String]) = {
   if (partitionIdx == 0) {
 lines.drop(n)
   }
   lines
 })

 Is that the full operation..

 What happens, if I have to drop so many records that the number exceeds
 partition 0.. ??
 How do i handle that case?




 On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng men...@gmail.com wrote:

 If the first partition doesn't have enough records, then it may not
 drop enough lines. Try

 rddData.zipWithIndex().filter(_._2 = 10L).map(_._1)

 It might trigger a job.

 Best,
 Xiangrui

 On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote:
  Hi Chengi,
 
  If you just want to skip first n lines in RDD, you can do
 
  rddData.mapPartitionsWithIndex((partitionIdx: Int, lines:
  Iterator[String])
  = {
if (partitionIdx == 0) {
  lines.drop(n)
}
lines
  }
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com
  wrote:
 
  Hi,
What is the easiest way to skip first n lines in rdd??
  I am not able to figure this one out?
  Thanks
 
 





Re: skip lines in spark

2014-04-23 Thread DB Tsai
What I suggested will not work if # of records you want to drop is more
than the data in first partition. In my use-case, I only drop the first
couple lines, so I don't have this issue.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu chengi.liu...@gmail.com wrote:

 Xiangrui,
   So, is it that full code suggestion is :
 val trigger = rddData.zipWithIndex().filter(
 _._2 = 10L).map(_._1)

 and then what DB Tsai recommended
 trigger.mapPartitionsWithIndex((partitionIdx: Int, lines:
 Iterator[String]) = {
   if (partitionIdx == 0) {
 lines.drop(n)
   }
   lines
 })

 Is that the full operation..

 What happens, if I have to drop so many records that the number exceeds
 partition 0.. ??
 How do i handle that case?




 On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng men...@gmail.com wrote:

 If the first partition doesn't have enough records, then it may not
 drop enough lines. Try

 rddData.zipWithIndex().filter(_._2 = 10L).map(_._1)

 It might trigger a job.

 Best,
 Xiangrui

 On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote:
  Hi Chengi,
 
  If you just want to skip first n lines in RDD, you can do
 
  rddData.mapPartitionsWithIndex((partitionIdx: Int, lines:
 Iterator[String])
  = {
if (partitionIdx == 0) {
  lines.drop(n)
}
lines
  }
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com
 wrote:
 
  Hi,
What is the easiest way to skip first n lines in rdd??
  I am not able to figure this one out?
  Thanks