Oh, I see, the problem is that the function you pass to mapPartitions must 
itself return an iterator or a collection. This is used so that you can return 
multiple output records for each input record. You can implement most of the 
existing map-like operations in Spark, such as map, filter, flatMap, etc, with 
mapPartitions, as well as new ones that might do a sliding window over each 
partition for example, or accumulate data across elements (e.g. to compute a 
sum).

For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:

>>> data.mapPartitions(lambda x: x).collect()
[1, 2, 3, 4]   # Just return the same iterator, doing nothing

>>> data.mapPartitions(lambda x: [list(x)]).collect()
[[1, 2], [3, 4]]   # Group together the elements of each partition in a single 
list (like glom)

>>> data.mapPartitions(lambda x: [sum(x)]).collect()
[3, 7]   # Sum each partition separately

However something like data.mapPartitions(lambda x: sum(x)).collect() will 
*not* work because sum returns a number, not an iterator. That’s why I put 
sum(x) inside a list above.

In practice mapPartitions is most useful if you want to share some data or work 
across the elements. For example maybe you want to load a lookup table once 
from an external file and then check each element in it, or sum up a bunch of 
elements without allocating a lot of vector objects.

Matei


On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> wrote:

> "There’s also mapPartitions, which gives you an iterator for each partition 
> instead of an array. You can then return an iterator or list of objects to 
> produce from that."
> 
> I confess, I was hoping for an example of just that, because i've not yet 
> been able to figure out how to use mapPartitions.  No doubt this is because 
> i'm a rank newcomer to Python, and haven't fully wrapped my head around 
> iterators.  All I get so far in my attempts to use mapPartitions is the 
> darned "suchnsuch is not an iterator" error.   
> 
> def myfunction(iterator): return [1,2,3]
> mydata.mapPartitions(lambda x: myfunction(x)).take(2)
> 
> 
> 
> 
> 
> On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <matei.zaha...@gmail.com> 
> wrote:
> Here’s an example of getting together all lines in a file as one string:
> 
> $ cat dir/a.txt
> Hello
> world!
> 
> $ cat dir/b.txt
> What's
> up??
> 
> $ bin/pyspark
> >>> files = sc.textFile(“dir”)
> 
> >>> files.collect()
> [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what 
> we want
> 
> >>> files.glom().collect()
> [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which 
> is an array of lines
> 
> >>> files.glom().map(lambda a: "\n".join(a)).collect()
> [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single 
> string
> 
> The glom() method groups all the elements of each partition of an RDD into an 
> array, giving you an RDD of arrays of objects. If your input is small files, 
> you always have one partition per file.
> 
> There’s also mapPartitions, which gives you an iterator for each partition 
> instead of an array. You can then return an iterator or list of objects to 
> produce from that.
> 
> Matei
> 
> 
> On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
> 
> > Thanks Matei.  That makes sense.  I have here a dataset of many many 
> > smallish XML files, so using mapPartitions that way would make sense.  I'd 
> > love to see a code example though ...It's not as obvious to me how to do 
> > that as I probably should be.
> >
> > Thanks,
> > Diana
> >
> >
> > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <matei.zaha...@gmail.com> 
> > wrote:
> > Hi Diana,
> >
> > Non-text input formats are only supported in Java and Scala right now, 
> > where you can use sparkContext.hadoopFile or .hadoopDataset to load data 
> > with any InputFormat that Hadoop MapReduce supports. In Python, you 
> > unfortunately only have textFile, which gives you one record per line. For 
> > JSON, you’d have to fit the whole JSON object on one line as you said. 
> > Hopefully we’ll also have some other forms of input soon.
> >
> > If your input is a collection of separate files (say many .xml files), you 
> > can also use mapPartitions on it to group together the lines because each 
> > input file will end up being a single dataset partition (or map task). This 
> > will let you concatenate the lines in each file and parse them as one XML 
> > object.
> >
> > Matei
> >
> > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
> >
> >> Thanks, Krakna, very helpful.  The way I read the code, it looks like you 
> >> are assuming that each line in foo.log contains a complete json object?  
> >> (That is, that the data doesn't contain any records that are split into 
> >> multiple lines.)  If so, is that because you know that to be true of your 
> >> data?  Or did you do as Nicholas suggests and have some preprocessing on 
> >> the text input to flatten the data in that way?
> >>
> >> Thanks,
> >> Diana
> >>
> >>
> >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com> wrote:
> >> Katrina,
> >>
> >> Not sure if this is what you had in mind, but here's some simple pyspark 
> >> code that I recently wrote to deal with JSON files.
> >>
> >> from pyspark import SparkContext, SparkConf
> >>
> >>
> >>
> >> from operator import add
> >> import json
> >>
> >>
> >>
> >> import random
> >> import numpy as np
> >>
> >>
> >>
> >>
> >> def concatenate_paragraphs(sentence_array):
> >>
> >>
> >>
> >> return ' '.join(sentence_array).split(' ')
> >>
> >>
> >>
> >>
> >> logFile = 'foo.json'
> >> conf = SparkConf()
> >>
> >>
> >>
> >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
> >>  "1g")
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> sc = SparkContext(conf=conf)
> >>
> >>
> >>
> >> logData = sc.textFile(logFile).cache()
> >>
> >>
> >>
> >> num_lines = logData.count()
> >> print 'Number of lines: %d' % num_lines
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, 
> >> sentence2, ...]}}
> >> tm = logData.map(lambda s: (json.loads(s)['key'], 
> >> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> tm = tm.reduceByKey(lambda _, x: _ + x)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> op = tm.collect()
> >> for key, num_words in op:
> >>
> >>
> >>
> >>      print 'state: %s, num_words: %d' % (state, num_words)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User 
> >> List] <[hidden email]> wrote:
> >> I don't actually have any data.  I'm writing a course that teaches 
> >> students how to do this sort of thing and am interested in looking at a 
> >> variety of real life examples of people doing things like that.  I'd love 
> >> to see some working code implementing the "obvious work-around" you 
> >> mention...do you have any to share?  It's an approach that makes a lot of 
> >> sense, and as I said, I'd love to not have to re-invent the wheel if 
> >> someone else has already written that code.  Thanks!
> >>
> >> Diana
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote:
> >> There was a previous discussion about this here:
> >>
> >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
> >>
> >> How big are the XML or JSON files you're looking to deal with?
> >>
> >> It may not be practical to deserialize the entire document at once. In 
> >> that case an obvious work-around would be to have some kind of 
> >> pre-processing step that separates XML nodes/JSON objects with newlines so 
> >> that you can analyze the data with Spark in a "line-oriented format". Your 
> >> preprocessor wouldn't have to parse/deserialize the massive document; it 
> >> would just have to track open/closed tags/braces to know when to insert a 
> >> newline.
> >>
> >> Then you'd just open the line-delimited result and deserialize the 
> >> individual objects/nodes with map().
> >>
> >> Nick
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
> >> Has anyone got a working example of a Spark application that analyzes data 
> >> in a non-line-oriented format, such as XML or JSON?  I'd like to do this 
> >> without re-inventing the wheel...anyone care to share?  Thanks!
> >>
> >> Diana
> >>
> >>
> >>
> >>
> >> If you reply to this email, your message will be added to the discussion 
> >> below:
> >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
> >> To start a new topic under Apache Spark User List, email [hidden email]
> >> To unsubscribe from Apache Spark User List, click here.
> >> NAML
> >>
> >>
> >> View this message in context: Re: example of non-line oriented input data?
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >
> >
> 
> 

Reply via email to