I have an RDD<Object> which I get from Hbase scan using newAPIHadoopRDD. I am running here ZipWithIndex and its preserving the order. first object got 1 second got 2 third got 3 and so on nth object got n.
On 24 April 2015 at 20:56, Ganelin, Ilya <ilya.gane...@capitalone.com> wrote: > To maintain the order you can use zipWithIndex as Sean Owen pointed out. > This is the same as zipWithUniqueId except the assigned number is the index > of the data in the RDD which I believe matches the order of data as it's > stored on HDFS. > > > > Sent with Good (www.good.com) > > > -----Original Message----- > *From: *Michal Michalski [michal.michal...@boxever.com] > *Sent: *Friday, April 24, 2015 11:18 AM Eastern Standard Time > *To: *Ganelin, Ilya > *Cc: *Spico Florin; user > *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order of > the input data from Hadoop? > > I read it one by one as I need to maintain the order, but it doesn't mean > that I process them one by one later. Input lines refer to different > entities I update, so once I read them in order, I group them by the id of > the entity I want to update, sort the updates on per-entity basis and > process them further in parallel (including writing data to C* and Kafka at > the very end). That's what I use Spark for - the first step I ask about is > just a requirement related to the input format I get and need to support. > Everything what happens after that is just a normal data processing job > that you want to distribute. > > Kind regards, > Michał Michalski, > michal.michal...@boxever.com > > On 24 April 2015 at 16:10, Ganelin, Ilya <ilya.gane...@capitalone.com> > wrote: > >> If you're reading a file one by line then you should simply use Java's >> Hadoop FileSystem class to read the file with a BuffereInputStream. I don't >> think you need an RDD here. >> >> >> >> Sent with Good (www.good.com) >> >> >> -----Original Message----- >> *From: *Michal Michalski [michal.michal...@boxever.com] >> *Sent: *Friday, April 24, 2015 11:04 AM Eastern Standard Time >> *To: *Ganelin, Ilya >> *Cc: *Spico Florin; user >> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order of >> the input data from Hadoop? >> >> The problem I'm facing is that I need to process lines from input file in >> the order they're stored in the file, as they define the order of updates I >> need to apply on some data and these updates are not commutative so that >> order matters. Unfortunately the input is purely order-based, theres no >> timestamp per line etc. in the file and I'd prefer to avoid preparing the >> file in advance by adding ordinals before / after each line. From the >> approaches you suggested first two won't work as there's nothing I could >> sort by. I'm not sure about the third one - I'm just not sure what you >> meant there to be honest :-) >> >> Kind regards, >> Michał Michalski, >> michal.michal...@boxever.com >> >> On 24 April 2015 at 15:48, Ganelin, Ilya <ilya.gane...@capitalone.com> >> wrote: >> >>> Michael - you need to sort your RDD. Check out the shuffle documentation >>> on the Spark Programming Guide. It talks about this specifically. You can >>> resolve this in a couple of ways - either by collecting your RDD and >>> sorting it, using sortBy, or not worrying about the internal ordering. You >>> can still extract elements in order by using a filter with the zip if e.g >>> RDD.filter(s => s._2 < 50).sortBy(_._1) >>> >>> >>> >>> Sent with Good (www.good.com) >>> >>> >>> >>> -----Original Message----- >>> *From: *Michal Michalski [michal.michal...@boxever.com] >>> *Sent: *Friday, April 24, 2015 10:41 AM Eastern Standard Time >>> *To: *Spico Florin >>> *Cc: *user >>> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order of >>> the input data from Hadoop? >>> >>> Of course after you do it, you probably want to call >>> repartition(somevalue) on your RDD to "get your paralellism back". >>> >>> Kind regards, >>> Michał Michalski, >>> michal.michal...@boxever.com >>> >>> On 24 April 2015 at 15:28, Michal Michalski < >>> michal.michal...@boxever.com> wrote: >>> >>>> I did a quick test as I was curious about it too. I created a file with >>>> numbers from 0 to 999, in order, line by line. Then I did: >>>> >>>> scala> val numbers = sc.textFile("./numbers.txt") >>>> scala> val zipped = numbers.zipWithUniqueId >>>> scala> zipped.foreach(i => println(i)) >>>> >>>> Expected result if the order was preserved would be something like: (0, >>>> 0), (1, 1) etc. >>>> Unfortunately, the output looks like this: >>>> >>>> (126,1) >>>> (223,2) >>>> (320,3) >>>> (1,0) >>>> (127,11) >>>> (2,10) >>>> (...) >>>> >>>> The workaround I found that works for me for my specific use case >>>> (relatively small input files) is setting explicitly the number of >>>> partitions to 1 when reading a single *text* file: >>>> >>>> scala> val numbers_sp = sc.textFile("./numbers.txt", 1) >>>> >>>> Than the output is exactly as I would expect. >>>> >>>> I didn't dive into the code too much, but I took a very quick look at >>>> it and figured out - correct me if I missed something, it's Friday >>>> afternoon! ;-) - that this workaround will work fine for all the input >>>> formats inheriting from org.apache.hadoop.mapred.FileInputFormat including >>>> TextInputFormat, of course - see the implementation of getSplits() method >>>> there ( >>>> http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29 >>>> ). >>>> The numSplits variable passed there is exactly the same value as you >>>> provide as a second argument to textFile, which is minPartitions. However, >>>> while *min* suggests that we can only define a minimal number of >>>> partitions, while we have no control over the max, from what I can see in >>>> the code, that value specifies the *exact* number of partitions per the >>>> FileInputFormat.getSplits implementation. Of course it can differ for other >>>> input formats, but in this case it should work just fine. >>>> >>>> >>>> Kind regards, >>>> Michał Michalski, >>>> michal.michal...@boxever.com >>>> >>>> On 24 April 2015 at 14:05, Spico Florin <spicoflo...@gmail.com> wrote: >>>> >>>>> Hello! >>>>> I know that HadoopRDD partitions are built based on the number of >>>>> splits in HDFS. I'm wondering if these partitions preserve the initial >>>>> order of data in file. >>>>> As an example, if I have an HDFS (myTextFile) file that has these >>>>> splits: >>>>> >>>>> split 0-> line 1, ..., line k >>>>> split 1->line k+1,..., line k+n >>>>> splt 2->line k+n, line k+n+m >>>>> >>>>> and the code >>>>> val lines=sc.textFile("hdfs://mytextFile") >>>>> lines.zipWithIndex() >>>>> >>>>> will the order of lines preserved? >>>>> (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one. >>>>> >>>>> I found this question on stackoverflow ( >>>>> http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd) >>>>> whose answer intrigued me: >>>>> "Essentially, RDD's zipWithIndex() method seems to do this, but it >>>>> won't preserve the original ordering of the data the RDD was created from" >>>>> >>>>> Can you please confirm that is this the correct answer? >>>>> >>>>> Thanks. >>>>> Florin >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> ------------------------------ >>> >>> The information contained in this e-mail is confidential and/or >>> proprietary to Capital One and/or its affiliates. The information >>> transmitted herewith is intended only for use by the individual or entity >>> to which it is addressed. If the reader of this message is not the >>> intended recipient, you are hereby notified that any review, >>> retransmission, dissemination, distribution, copying or other use of, or >>> taking of any action in reliance upon this information is strictly >>> prohibited. If you have received this communication in error, please >>> contact the sender and delete the material from your computer. >>> >> >> >> ------------------------------ >> >> The information contained in this e-mail is confidential and/or >> proprietary to Capital One and/or its affiliates. The information >> transmitted herewith is intended only for use by the individual or entity >> to which it is addressed. If the reader of this message is not the >> intended recipient, you are hereby notified that any review, >> retransmission, dissemination, distribution, copying or other use of, or >> taking of any action in reliance upon this information is strictly >> prohibited. If you have received this communication in error, please >> contact the sender and delete the material from your computer. >> > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the > intended recipient, you are hereby notified that any review, > retransmission, dissemination, distribution, copying or other use of, or > taking of any action in reliance upon this information is strictly > prohibited. If you have received this communication in error, please > contact the sender and delete the material from your computer. >