Well, if anyone is still following this, I've gotten the following code working which in theory should allow me to parse whole XML files: (the problem was that I can't return the tree iterator directly. I have to call iter(). Why?)
import xml.etree.ElementTree as ET # two source files, format <data> <country name="...">...</country>...</data> mydata=sc.textFile("file:/home/training/countries*.xml") def parsefile(iterator): s = '' for i in iterator: s = s + str(i) tree = ET.fromstring(s) treeiterator = tree.getiterator("country") # why to I have to convert an iterator to an iterator? not sure but required return iter(treeiterator) mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect() The output is what I expect: [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] BUT I'm a bit concerned about the construction of the string "s". How big can my file be before converting it to a string becomes problematic? On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dcarr...@cloudera.com>wrote: > Thanks, Matei. > > In the context of this discussion, it would seem mapParitions is > essential, because it's the only way I'm going to be able to process each > file as a whole, in our example of a large number of small XML files which > need to be parsed as a whole file because records are not required to be on > a single line. > > The theory makes sense but I'm still utterly lost as to how to implement > it. Unfortunately there's only a single example of the use of > mapPartitions in any of the Python example programs, which is the log > regression example, which I can't run because it requires Python 2.7 and > I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 > is unsupported...is it?) > > I'd really really love to see a real life example of a Python use of > mapPartitions. I do appreciate the very simple examples you provided, but > (perhaps because of my novice status on Python) I can't figure out how to > translate those to a real world situation in which I'm building RDDs from > files, not inline collections like [(1,2),(2,3)]. > > Also, you say that the function called in mapPartitions can return a > collection OR an iterator. I tried returning an iterator by calling > ElementTree getiterator function, but still got the error telling me my > object was not an iterator. > > If anyone has a real life example of mapPartitions returning a Python > iterator, that would be fabulous. > > Diana > > > On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > >> Oh, I see, the problem is that the function you pass to mapPartitions >> must itself return an iterator or a collection. This is used so that you >> can return multiple output records for each input record. You can implement >> most of the existing map-like operations in Spark, such as map, filter, >> flatMap, etc, with mapPartitions, as well as new ones that might do a >> sliding window over each partition for example, or accumulate data across >> elements (e.g. to compute a sum). >> >> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this >> will work: >> >> >>> data.mapPartitions(lambda x: x).collect() >> [1, 2, 3, 4] # Just return the same iterator, doing nothing >> >> >>> data.mapPartitions(lambda x: [list(x)]).collect() >> [[1, 2], [3, 4]] # Group together the elements of each partition in a >> single list (like glom) >> >> >>> data.mapPartitions(lambda x: [sum(x)]).collect() >> [3, 7] # Sum each partition separately >> >> However something like data.mapPartitions(lambda x: sum(x)).collect() >> will *not* work because sum returns a number, not an iterator. That's why I >> put sum(x) inside a list above. >> >> In practice mapPartitions is most useful if you want to share some data >> or work across the elements. For example maybe you want to load a lookup >> table once from an external file and then check each element in it, or sum >> up a bunch of elements without allocating a lot of vector objects. >> >> Matei >> >> >> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> >> wrote: >> >> > "There's also mapPartitions, which gives you an iterator for each >> partition instead of an array. You can then return an iterator or list of >> objects to produce from that." >> > >> > I confess, I was hoping for an example of just that, because i've not >> yet been able to figure out how to use mapPartitions. No doubt this is >> because i'm a rank newcomer to Python, and haven't fully wrapped my head >> around iterators. All I get so far in my attempts to use mapPartitions is >> the darned "suchnsuch is not an iterator" error. >> > >> > def myfunction(iterator): return [1,2,3] >> > mydata.mapPartitions(lambda x: myfunction(x)).take(2) >> > >> > >> > >> > >> > >> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <matei.zaha...@gmail.com> >> wrote: >> > Here's an example of getting together all lines in a file as one string: >> > >> > $ cat dir/a.txt >> > Hello >> > world! >> > >> > $ cat dir/b.txt >> > What's >> > up?? >> > >> > $ bin/pyspark >> > >>> files = sc.textFile("dir") >> > >> > >>> files.collect() >> > [u'Hello', u'world!', u"What's", u'up??'] # one element per line, not >> what we want >> > >> > >>> files.glom().collect() >> > [[u'Hello', u'world!'], [u"What's", u'up??']] # one element per file, >> which is an array of lines >> > >> > >>> files.glom().map(lambda a: "\n".join(a)).collect() >> > [u'Hello\nworld!', u"What's\nup??"] # join back each file into a >> single string >> > >> > The glom() method groups all the elements of each partition of an RDD >> into an array, giving you an RDD of arrays of objects. If your input is >> small files, you always have one partition per file. >> > >> > There's also mapPartitions, which gives you an iterator for each >> partition instead of an array. You can then return an iterator or list of >> objects to produce from that. >> > >> > Matei >> > >> > >> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com> >> wrote: >> > >> > > Thanks Matei. That makes sense. I have here a dataset of many many >> smallish XML files, so using mapPartitions that way would make sense. I'd >> love to see a code example though ...It's not as obvious to me how to do >> that as I probably should be. >> > > >> > > Thanks, >> > > Diana >> > > >> > > >> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia < >> matei.zaha...@gmail.com> wrote: >> > > Hi Diana, >> > > >> > > Non-text input formats are only supported in Java and Scala right >> now, where you can use sparkContext.hadoopFile or .hadoopDataset to load >> data with any InputFormat that Hadoop MapReduce supports. In Python, you >> unfortunately only have textFile, which gives you one record per line. For >> JSON, you'd have to fit the whole JSON object on one line as you said. >> Hopefully we'll also have some other forms of input soon. >> > > >> > > If your input is a collection of separate files (say many .xml >> files), you can also use mapPartitions on it to group together the lines >> because each input file will end up being a single dataset partition (or >> map task). This will let you concatenate the lines in each file and parse >> them as one XML object. >> > > >> > > Matei >> > > >> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com> >> wrote: >> > > >> > >> Thanks, Krakna, very helpful. The way I read the code, it looks >> like you are assuming that each line in foo.log contains a complete json >> object? (That is, that the data doesn't contain any records that are split >> into multiple lines.) If so, is that because you know that to be true of >> your data? Or did you do as Nicholas suggests and have some preprocessing >> on the text input to flatten the data in that way? >> > >> >> > >> Thanks, >> > >> Diana >> > >> >> > >> >> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com> >> wrote: >> > >> Katrina, >> > >> >> > >> Not sure if this is what you had in mind, but here's some simple >> pyspark code that I recently wrote to deal with JSON files. >> > >> >> > >> from pyspark import SparkContext, SparkConf >> > >> >> > >> >> > >> >> > >> from operator import add >> > >> import json >> > >> >> > >> >> > >> >> > >> import random >> > >> import numpy as np >> > >> >> > >> >> > >> >> > >> >> > >> def concatenate_paragraphs(sentence_array): >> > >> >> > >> >> > >> >> > >> return ' '.join(sentence_array).split(' ') >> > >> >> > >> >> > >> >> > >> >> > >> logFile = 'foo.json' >> > >> conf = SparkConf() >> > >> >> > >> >> > >> >> > >> >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", >> "1g") >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> sc = SparkContext(conf=conf) >> > >> >> > >> >> > >> >> > >> logData = sc.textFile(logFile).cache() >> > >> >> > >> >> > >> >> > >> num_lines = logData.count() >> > >> print 'Number of lines: %d' % num_lines >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, >> sentence2, ...]}} >> > >> tm = logData.map(lambda s: (json.loads(s)['key'], >> len(concatenate_paragraphs(json.loads(s)['paragraphs'])))) >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> tm = tm.reduceByKey(lambda _, x: _ + x) >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> op = tm.collect() >> > >> for key, num_words in op: >> > >> >> > >> >> > >> >> > >> print 'state: %s, num_words: %d' % (state, num_words) >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark >> User List] <[hidden email]> wrote: >> > >> I don't actually have any data. I'm writing a course that teaches >> students how to do this sort of thing and am interested in looking at a >> variety of real life examples of people doing things like that. I'd love >> to see some working code implementing the "obvious work-around" you >> mention...do you have any to share? It's an approach that makes a lot of >> sense, and as I said, I'd love to not have to re-invent the wheel if >> someone else has already written that code. Thanks! >> > >> >> > >> Diana >> > >> >> > >> >> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> >> wrote: >> > >> There was a previous discussion about this here: >> > >> >> > >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html >> > >> >> > >> How big are the XML or JSON files you're looking to deal with? >> > >> >> > >> It may not be practical to deserialize the entire document at once. >> In that case an obvious work-around would be to have some kind of >> pre-processing step that separates XML nodes/JSON objects with newlines so >> that you can analyze the data with Spark in a "line-oriented format". Your >> preprocessor wouldn't have to parse/deserialize the massive document; it >> would just have to track open/closed tags/braces to know when to insert a >> newline. >> > >> >> > >> Then you'd just open the line-delimited result and deserialize the >> individual objects/nodes with map(). >> > >> >> > >> Nick >> > >> >> > >> >> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> >> wrote: >> > >> Has anyone got a working example of a Spark application that >> analyzes data in a non-line-oriented format, such as XML or JSON? I'd like >> to do this without re-inventing the wheel...anyone care to share? Thanks! >> > >> >> > >> Diana >> > >> >> > >> >> > >> >> > >> >> > >> If you reply to this email, your message will be added to the >> discussion below: >> > >> >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html >> > >> To start a new topic under Apache Spark User List, email [hidden >> email] >> > >> To unsubscribe from Apache Spark User List, click here. >> > >> NAML >> > >> >> > >> >> > >> View this message in context: Re: example of non-line oriented input >> data? >> > >> Sent from the Apache Spark User List mailing list archive at >> Nabble.com. >> > >> >> > > >> > > >> > >> > >> >> >