Oh, I see, the problem is that the function you pass to mapPartitions must itself return an iterator or a collection. This is used so that you can return multiple output records for each input record. You can implement most of the existing map-like operations in Spark, such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a sliding window over each partition for example, or accumulate data across elements (e.g. to compute a sum).
For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work: >>> data.mapPartitions(lambda x: x).collect() [1, 2, 3, 4] # Just return the same iterator, doing nothing >>> data.mapPartitions(lambda x: [list(x)]).collect() [[1, 2], [3, 4]] # Group together the elements of each partition in a single list (like glom) >>> data.mapPartitions(lambda x: [sum(x)]).collect() [3, 7] # Sum each partition separately However something like data.mapPartitions(lambda x: sum(x)).collect() will *not* work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list above. In practice mapPartitions is most useful if you want to share some data or work across the elements. For example maybe you want to load a lookup table once from an external file and then check each element in it, or sum up a bunch of elements without allocating a lot of vector objects. Matei On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> wrote: > "There’s also mapPartitions, which gives you an iterator for each partition > instead of an array. You can then return an iterator or list of objects to > produce from that." > > I confess, I was hoping for an example of just that, because i've not yet > been able to figure out how to use mapPartitions. No doubt this is because > i'm a rank newcomer to Python, and haven't fully wrapped my head around > iterators. All I get so far in my attempts to use mapPartitions is the > darned "suchnsuch is not an iterator" error. > > def myfunction(iterator): return [1,2,3] > mydata.mapPartitions(lambda x: myfunction(x)).take(2) > > > > > > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <matei.zaha...@gmail.com> > wrote: > Here’s an example of getting together all lines in a file as one string: > > $ cat dir/a.txt > Hello > world! > > $ cat dir/b.txt > What's > up?? > > $ bin/pyspark > >>> files = sc.textFile(“dir”) > > >>> files.collect() > [u'Hello', u'world!', u"What's", u'up??’] # one element per line, not what > we want > > >>> files.glom().collect() > [[u'Hello', u'world!'], [u"What's", u'up??’]] # one element per file, which > is an array of lines > > >>> files.glom().map(lambda a: "\n".join(a)).collect() > [u'Hello\nworld!', u"What's\nup??”] # join back each file into a single > string > > The glom() method groups all the elements of each partition of an RDD into an > array, giving you an RDD of arrays of objects. If your input is small files, > you always have one partition per file. > > There’s also mapPartitions, which gives you an iterator for each partition > instead of an array. You can then return an iterator or list of objects to > produce from that. > > Matei > > > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com> wrote: > > > Thanks Matei. That makes sense. I have here a dataset of many many > > smallish XML files, so using mapPartitions that way would make sense. I'd > > love to see a code example though ...It's not as obvious to me how to do > > that as I probably should be. > > > > Thanks, > > Diana > > > > > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <matei.zaha...@gmail.com> > > wrote: > > Hi Diana, > > > > Non-text input formats are only supported in Java and Scala right now, > > where you can use sparkContext.hadoopFile or .hadoopDataset to load data > > with any InputFormat that Hadoop MapReduce supports. In Python, you > > unfortunately only have textFile, which gives you one record per line. For > > JSON, you’d have to fit the whole JSON object on one line as you said. > > Hopefully we’ll also have some other forms of input soon. > > > > If your input is a collection of separate files (say many .xml files), you > > can also use mapPartitions on it to group together the lines because each > > input file will end up being a single dataset partition (or map task). This > > will let you concatenate the lines in each file and parse them as one XML > > object. > > > > Matei > > > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com> wrote: > > > >> Thanks, Krakna, very helpful. The way I read the code, it looks like you > >> are assuming that each line in foo.log contains a complete json object? > >> (That is, that the data doesn't contain any records that are split into > >> multiple lines.) If so, is that because you know that to be true of your > >> data? Or did you do as Nicholas suggests and have some preprocessing on > >> the text input to flatten the data in that way? > >> > >> Thanks, > >> Diana > >> > >> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com> wrote: > >> Katrina, > >> > >> Not sure if this is what you had in mind, but here's some simple pyspark > >> code that I recently wrote to deal with JSON files. > >> > >> from pyspark import SparkContext, SparkConf > >> > >> > >> > >> from operator import add > >> import json > >> > >> > >> > >> import random > >> import numpy as np > >> > >> > >> > >> > >> def concatenate_paragraphs(sentence_array): > >> > >> > >> > >> return ' '.join(sentence_array).split(' ') > >> > >> > >> > >> > >> logFile = 'foo.json' > >> conf = SparkConf() > >> > >> > >> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", > >> "1g") > >> > >> > >> > >> > >> > >> > >> > >> sc = SparkContext(conf=conf) > >> > >> > >> > >> logData = sc.textFile(logFile).cache() > >> > >> > >> > >> num_lines = logData.count() > >> print 'Number of lines: %d' % num_lines > >> > >> > >> > >> > >> > >> > >> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, > >> sentence2, ...]}} > >> tm = logData.map(lambda s: (json.loads(s)['key'], > >> len(concatenate_paragraphs(json.loads(s)['paragraphs'])))) > >> > >> > >> > >> > >> > >> > >> > >> tm = tm.reduceByKey(lambda _, x: _ + x) > >> > >> > >> > >> > >> > >> > >> > >> op = tm.collect() > >> for key, num_words in op: > >> > >> > >> > >> print 'state: %s, num_words: %d' % (state, num_words) > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User > >> List] <[hidden email]> wrote: > >> I don't actually have any data. I'm writing a course that teaches > >> students how to do this sort of thing and am interested in looking at a > >> variety of real life examples of people doing things like that. I'd love > >> to see some working code implementing the "obvious work-around" you > >> mention...do you have any to share? It's an approach that makes a lot of > >> sense, and as I said, I'd love to not have to re-invent the wheel if > >> someone else has already written that code. Thanks! > >> > >> Diana > >> > >> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> wrote: > >> There was a previous discussion about this here: > >> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html > >> > >> How big are the XML or JSON files you're looking to deal with? > >> > >> It may not be practical to deserialize the entire document at once. In > >> that case an obvious work-around would be to have some kind of > >> pre-processing step that separates XML nodes/JSON objects with newlines so > >> that you can analyze the data with Spark in a "line-oriented format". Your > >> preprocessor wouldn't have to parse/deserialize the massive document; it > >> would just have to track open/closed tags/braces to know when to insert a > >> newline. > >> > >> Then you'd just open the line-delimited result and deserialize the > >> individual objects/nodes with map(). > >> > >> Nick > >> > >> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote: > >> Has anyone got a working example of a Spark application that analyzes data > >> in a non-line-oriented format, such as XML or JSON? I'd like to do this > >> without re-inventing the wheel...anyone care to share? Thanks! > >> > >> Diana > >> > >> > >> > >> > >> If you reply to this email, your message will be added to the discussion > >> below: > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html > >> To start a new topic under Apache Spark User List, email [hidden email] > >> To unsubscribe from Apache Spark User List, click here. > >> NAML > >> > >> > >> View this message in context: Re: example of non-line oriented input data? > >> Sent from the Apache Spark User List mailing list archive at Nabble.com. > >> > > > > > >