The problem I'm facing is that I need to process lines from input file in
the order they're stored in the file, as they define the order of updates I
need to apply on some data and these updates are not commutative so that
order matters. Unfortunately the input is purely order-based, theres no
timestamp per line etc. in the file and I'd prefer to avoid preparing the
file in advance by adding ordinals before / after each line. From the
approaches you suggested first two won't work as there's nothing I could
sort by. I'm not sure about the third one - I'm just not sure what you
meant there to be honest :-)

Kind regards,
Michał Michalski,
michal.michal...@boxever.com

On 24 April 2015 at 15:48, Ganelin, Ilya <ilya.gane...@capitalone.com>
wrote:

>  Michael - you need to sort your RDD. Check out the shuffle documentation
> on the Spark Programming Guide. It talks about this specifically. You can
> resolve this in a couple of ways - either by collecting your RDD and
> sorting it, using sortBy, or not worrying about the internal ordering. You
> can still extract elements in order by using a filter with the zip if e.g
> RDD.filter(s => s._2 < 50).sortBy(_._1)
>
>
>
> Sent with Good (www.good.com)
>
>
>
> -----Original Message-----
> *From: *Michal Michalski [michal.michal...@boxever.com]
> *Sent: *Friday, April 24, 2015 10:41 AM Eastern Standard Time
> *To: *Spico Florin
> *Cc: *user
> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order of
> the input data from Hadoop?
>
> Of course after you do it, you probably want to call
> repartition(somevalue) on your RDD to "get your paralellism back".
>
>  Kind regards,
> Michał Michalski,
> michal.michal...@boxever.com
>
> On 24 April 2015 at 15:28, Michal Michalski <michal.michal...@boxever.com>
> wrote:
>
>> I did a quick test as I was curious about it too. I created a file with
>> numbers from 0 to 999, in order, line by line. Then I did:
>>
>> scala> val numbers = sc.textFile("./numbers.txt")
>> scala> val zipped = numbers.zipWithUniqueId
>> scala> zipped.foreach(i => println(i))
>>
>> Expected result if the order was preserved would be something like: (0,
>> 0), (1, 1) etc.
>> Unfortunately, the output looks like this:
>>
>>  (126,1)
>> (223,2)
>> (320,3)
>> (1,0)
>> (127,11)
>> (2,10)
>>  (...)
>>
>> The workaround I found that works for me for my specific use case
>> (relatively small input files) is setting explicitly the number of
>> partitions to 1 when reading a single *text* file:
>>
>> scala> val numbers_sp = sc.textFile("./numbers.txt", 1)
>>
>> Than the output is exactly as I would expect.
>>
>> I didn't dive into the code too much, but I took a very quick look at it
>> and figured out - correct me if I missed something, it's Friday afternoon!
>> ;-)  - that this workaround will work fine for all the input formats
>> inheriting from org.apache.hadoop.mapred.FileInputFormat including
>> TextInputFormat, of course - see the implementation of getSplits() method
>> there (
>> http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29
>> ).
>> The numSplits variable passed there is exactly the same value as you
>> provide as a second argument to textFile, which is minPartitions. However,
>> while *min* suggests that we can only define a minimal number of
>> partitions, while we have no control over the max, from what I can see in
>> the code, that value specifies the *exact* number of partitions per the
>> FileInputFormat.getSplits implementation. Of course it can differ for other
>> input formats, but in this case it should work just fine.
>>
>>
>>  Kind regards,
>> Michał Michalski,
>> michal.michal...@boxever.com
>>
>> On 24 April 2015 at 14:05, Spico Florin <spicoflo...@gmail.com> wrote:
>>
>>> Hello!
>>>   I know that HadoopRDD partitions are built based on the number of
>>> splits in HDFS. I'm wondering if these partitions preserve the initial
>>> order of data in file.
>>> As an example, if I have an HDFS (myTextFile) file that has these splits:
>>>
>>> split 0-> line 1, ..., line k
>>> split 1->line k+1,..., line k+n
>>> splt 2->line k+n, line k+n+m
>>>
>>> and the code
>>> val lines=sc.textFile("hdfs://mytextFile")
>>> lines.zipWithIndex()
>>>
>>> will the order of lines preserved?
>>> (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one.
>>>
>>> I found this question on stackoverflow (
>>> http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd)
>>> whose answer intrigued me:
>>> "Essentially, RDD's zipWithIndex() method seems to do this, but it won't
>>> preserve the original ordering of the data the RDD was created from"
>>>
>>> Can you please confirm that is this the correct answer?
>>>
>>> Thanks.
>>>  Florin
>>>
>>>
>>>
>>>
>>>
>>>
>>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed.  If the reader of this message is not the
> intended recipient, you are hereby notified that any review,
> retransmission, dissemination, distribution, copying or other use of, or
> taking of any action in reliance upon this information is strictly
> prohibited. If you have received this communication in error, please
> contact the sender and delete the material from your computer.
>

Reply via email to