FYI, one thing we’ve added now is support for reading multiple text files from a directory as separate records: https://github.com/apache/spark/pull/327. This should remove the need for mapPartitions discussed here.
Avro and SequenceFiles look like they may not make it for 1.0, but there’s a chance that Parquet support with Spark SQL will, which should let you store binary data a bit better. Matei On Mar 19, 2014, at 3:12 PM, Jeremy Freeman <freeman.jer...@gmail.com> wrote: > Another vote on this, support for simple SequenceFiles and/or Avro would be > terrific, as using plain text can be very space-inefficient, especially for > numerical data. > > -- Jeremy > > On Mar 19, 2014, at 5:24 PM, Nicholas Chammas <nicholas.cham...@gmail.com> > wrote: > >> I'd second the request for Avro support in Python first, followed by Parquet. >> >> >> On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin <itparan...@gmail.com> >> wrote: >> >> On 19 Mar 2014, at 19:54, Diana Carroll <dcarr...@cloudera.com> wrote: >> >>> Actually, thinking more on this question, Matei: I'd definitely say support >>> for Avro. There's a lot of interest in this!! >>> >> >> Agree, and parquet as default Cloudera Impala format. >> >> >> >> >>> On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <matei.zaha...@gmail.com> >>> wrote: >>> BTW one other thing — in your experience, Diana, which non-text >>> InputFormats would be most useful to support in Python first? Would it be >>> Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or >>> something else? I think a per-file text input format that does the stuff we >>> did here would also be good. >>> >>> Matei >>> >>> >>> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote: >>> >>>> Hi Diana, >>>> >>>> This seems to work without the iter() in front if you just return >>>> treeiterator. What happened when you didn’t include that? Treeiterator >>>> should return an iterator. >>>> >>>> Anyway, this is a good example of mapPartitions. It’s one where you want >>>> to view the whole file as one object (one XML here), so you couldn’t >>>> implement this using a flatMap, but you still want to return multiple >>>> values. The MLlib example you saw needs Python 2.7 because unfortunately >>>> that is a requirement for our Python MLlib support (see >>>> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). >>>> We’d like to relax this later but we’re using some newer features of >>>> NumPy and Python. The rest of PySpark works on 2.6. >>>> >>>> In terms of the size in memory, here both the string s and the XML tree >>>> constructed from it need to fit in, so you can’t work on very large >>>> individual XML files. You may be able to use a streaming XML parser >>>> instead to extract elements from the data in a streaming fashion, without >>>> every materializing the whole tree. >>>> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader >>>> is one example. >>>> >>>> Matei >>>> >>>> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dcarr...@cloudera.com> wrote: >>>> >>>>> Well, if anyone is still following this, I've gotten the following code >>>>> working which in theory should allow me to parse whole XML files: (the >>>>> problem was that I can't return the tree iterator directly. I have to >>>>> call iter(). Why?) >>>>> >>>>> import xml.etree.ElementTree as ET >>>>> >>>>> # two source files, format <data> <country >>>>> name="...">...</country>...</data> >>>>> mydata=sc.textFile("file:/home/training/countries*.xml") >>>>> >>>>> def parsefile(iterator): >>>>> s = '' >>>>> for i in iterator: s = s + str(i) >>>>> tree = ET.fromstring(s) >>>>> treeiterator = tree.getiterator("country") >>>>> # why to I have to convert an iterator to an iterator? not sure but >>>>> required >>>>> return iter(treeiterator) >>>>> >>>>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: >>>>> element.attrib).collect() >>>>> >>>>> The output is what I expect: >>>>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] >>>>> >>>>> BUT I'm a bit concerned about the construction of the string "s". How >>>>> big can my file be before converting it to a string becomes problematic? >>>>> >>>>> >>>>> >>>>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dcarr...@cloudera.com> >>>>> wrote: >>>>> Thanks, Matei. >>>>> >>>>> In the context of this discussion, it would seem mapParitions is >>>>> essential, because it's the only way I'm going to be able to process each >>>>> file as a whole, in our example of a large number of small XML files >>>>> which need to be parsed as a whole file because records are not required >>>>> to be on a single line. >>>>> >>>>> The theory makes sense but I'm still utterly lost as to how to implement >>>>> it. Unfortunately there's only a single example of the use of >>>>> mapPartitions in any of the Python example programs, which is the log >>>>> regression example, which I can't run because it requires Python 2.7 and >>>>> I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 >>>>> is unsupported...is it?) >>>>> >>>>> I'd really really love to see a real life example of a Python use of >>>>> mapPartitions. I do appreciate the very simple examples you provided, >>>>> but (perhaps because of my novice status on Python) I can't figure out >>>>> how to translate those to a real world situation in which I'm building >>>>> RDDs from files, not inline collections like [(1,2),(2,3)]. >>>>> >>>>> Also, you say that the function called in mapPartitions can return a >>>>> collection OR an iterator. I tried returning an iterator by calling >>>>> ElementTree getiterator function, but still got the error telling me my >>>>> object was not an iterator. >>>>> >>>>> If anyone has a real life example of mapPartitions returning a Python >>>>> iterator, that would be fabulous. >>>>> >>>>> Diana >>>>> >>>>> >>>>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <matei.zaha...@gmail.com> >>>>> wrote: >>>>> Oh, I see, the problem is that the function you pass to mapPartitions >>>>> must itself return an iterator or a collection. This is used so that you >>>>> can return multiple output records for each input record. You can >>>>> implement most of the existing map-like operations in Spark, such as map, >>>>> filter, flatMap, etc, with mapPartitions, as well as new ones that might >>>>> do a sliding window over each partition for example, or accumulate data >>>>> across elements (e.g. to compute a sum). >>>>> >>>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this >>>>> will work: >>>>> >>>>> >>> data.mapPartitions(lambda x: x).collect() >>>>> [1, 2, 3, 4] # Just return the same iterator, doing nothing >>>>> >>>>> >>> data.mapPartitions(lambda x: [list(x)]).collect() >>>>> [[1, 2], [3, 4]] # Group together the elements of each partition in a >>>>> single list (like glom) >>>>> >>>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect() >>>>> [3, 7] # Sum each partition separately >>>>> >>>>> However something like data.mapPartitions(lambda x: sum(x)).collect() >>>>> will *not* work because sum returns a number, not an iterator. That’s why >>>>> I put sum(x) inside a list above. >>>>> >>>>> In practice mapPartitions is most useful if you want to share some data >>>>> or work across the elements. For example maybe you want to load a lookup >>>>> table once from an external file and then check each element in it, or >>>>> sum up a bunch of elements without allocating a lot of vector objects. >>>>> >>>>> Matei >>>>> >>>>> >>>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> wrote: >>>>> >>>>> > "There’s also mapPartitions, which gives you an iterator for each >>>>> > partition instead of an array. You can then return an iterator or list >>>>> > of objects to produce from that." >>>>> > >>>>> > I confess, I was hoping for an example of just that, because i've not >>>>> > yet been able to figure out how to use mapPartitions. No doubt this is >>>>> > because i'm a rank newcomer to Python, and haven't fully wrapped my >>>>> > head around iterators. All I get so far in my attempts to use >>>>> > mapPartitions is the darned "suchnsuch is not an iterator" error. >>>>> > >>>>> > def myfunction(iterator): return [1,2,3] >>>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2) >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia >>>>> > <matei.zaha...@gmail.com> wrote: >>>>> > Here’s an example of getting together all lines in a file as one string: >>>>> > >>>>> > $ cat dir/a.txt >>>>> > Hello >>>>> > world! >>>>> > >>>>> > $ cat dir/b.txt >>>>> > What's >>>>> > up?? >>>>> > >>>>> > $ bin/pyspark >>>>> > >>> files = sc.textFile(“dir”) >>>>> > >>>>> > >>> files.collect() >>>>> > [u'Hello', u'world!', u"What's", u'up??’] # one element per line, not >>>>> > what we want >>>>> > >>>>> > >>> files.glom().collect() >>>>> > [[u'Hello', u'world!'], [u"What's", u'up??’]] # one element per file, >>>>> > which is an array of lines >>>>> > >>>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect() >>>>> > [u'Hello\nworld!', u"What's\nup??”] # join back each file into a >>>>> > single string >>>>> > >>>>> > The glom() method groups all the elements of each partition of an RDD >>>>> > into an array, giving you an RDD of arrays of objects. If your input is >>>>> > small files, you always have one partition per file. >>>>> > >>>>> > There’s also mapPartitions, which gives you an iterator for each >>>>> > partition instead of an array. You can then return an iterator or list >>>>> > of objects to produce from that. >>>>> > >>>>> > Matei >>>>> > >>>>> > >>>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com> >>>>> > wrote: >>>>> > >>>>> > > Thanks Matei. That makes sense. I have here a dataset of many many >>>>> > > smallish XML files, so using mapPartitions that way would make sense. >>>>> > > I'd love to see a code example though ...It's not as obvious to me >>>>> > > how to do that as I probably should be. >>>>> > > >>>>> > > Thanks, >>>>> > > Diana >>>>> > > >>>>> > > >>>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia >>>>> > > <matei.zaha...@gmail.com> wrote: >>>>> > > Hi Diana, >>>>> > > >>>>> > > Non-text input formats are only supported in Java and Scala right >>>>> > > now, where you can use sparkContext.hadoopFile or .hadoopDataset to >>>>> > > load data with any InputFormat that Hadoop MapReduce supports. In >>>>> > > Python, you unfortunately only have textFile, which gives you one >>>>> > > record per line. For JSON, you’d have to fit the whole JSON object on >>>>> > > one line as you said. Hopefully we’ll also have some other forms of >>>>> > > input soon. >>>>> > > >>>>> > > If your input is a collection of separate files (say many .xml >>>>> > > files), you can also use mapPartitions on it to group together the >>>>> > > lines because each input file will end up being a single dataset >>>>> > > partition (or map task). This will let you concatenate the lines in >>>>> > > each file and parse them as one XML object. >>>>> > > >>>>> > > Matei >>>>> > > >>>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com> >>>>> > > wrote: >>>>> > > >>>>> > >> Thanks, Krakna, very helpful. The way I read the code, it looks >>>>> > >> like you are assuming that each line in foo.log contains a complete >>>>> > >> json object? (That is, that the data doesn't contain any records >>>>> > >> that are split into multiple lines.) If so, is that because you >>>>> > >> know that to be true of your data? Or did you do as Nicholas >>>>> > >> suggests and have some preprocessing on the text input to flatten >>>>> > >> the data in that way? >>>>> > >> >>>>> > >> Thanks, >>>>> > >> Diana >>>>> > >> >>>>> > >> >>>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com> >>>>> > >> wrote: >>>>> > >> Katrina, >>>>> > >> >>>>> > >> Not sure if this is what you had in mind, but here's some simple >>>>> > >> pyspark code that I recently wrote to deal with JSON files. >>>>> > >> >>>>> > >> from pyspark import SparkContext, SparkConf >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> from operator import add >>>>> > >> import json >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> import random >>>>> > >> import numpy as np >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> def concatenate_paragraphs(sentence_array): >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> return ' '.join(sentence_array).split(' ') >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> logFile = 'foo.json' >>>>> > >> conf = SparkConf() >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", >>>>> > >> "1g") >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> sc = SparkContext(conf=conf) >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> logData = sc.textFile(logFile).cache() >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> num_lines = logData.count() >>>>> > >> print 'Number of lines: %d' % num_lines >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, >>>>> > >> sentence2, ...]}} >>>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], >>>>> > >> len(concatenate_paragraphs(json.loads(s)['paragraphs'])))) >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x) >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> op = tm.collect() >>>>> > >> for key, num_words in op: >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> print 'state: %s, num_words: %d' % (state, num_words) >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark >>>>> > >> User List] <[hidden email]> wrote: >>>>> > >> I don't actually have any data. I'm writing a course that teaches >>>>> > >> students how to do this sort of thing and am interested in looking >>>>> > >> at a variety of real life examples of people doing things like that. >>>>> > >> I'd love to see some working code implementing the "obvious >>>>> > >> work-around" you mention...do you have any to share? It's an >>>>> > >> approach that makes a lot of sense, and as I said, I'd love to not >>>>> > >> have to re-invent the wheel if someone else has already written that >>>>> > >> code. Thanks! >>>>> > >> >>>>> > >> Diana >>>>> > >> >>>>> > >> >>>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> >>>>> > >> wrote: >>>>> > >> There was a previous discussion about this here: >>>>> > >> >>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html >>>>> > >> >>>>> > >> How big are the XML or JSON files you're looking to deal with? >>>>> > >> >>>>> > >> It may not be practical to deserialize the entire document at once. >>>>> > >> In that case an obvious work-around would be to have some kind of >>>>> > >> pre-processing step that separates XML nodes/JSON objects with >>>>> > >> newlines so that you can analyze the data with Spark in a >>>>> > >> "line-oriented format". Your preprocessor wouldn't have to >>>>> > >> parse/deserialize the massive document; it would just have to track >>>>> > >> open/closed tags/braces to know when to insert a newline. >>>>> > >> >>>>> > >> Then you'd just open the line-delimited result and deserialize the >>>>> > >> individual objects/nodes with map(). >>>>> > >> >>>>> > >> Nick >>>>> > >> >>>>> > >> >>>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> >>>>> > >> wrote: >>>>> > >> Has anyone got a working example of a Spark application that >>>>> > >> analyzes data in a non-line-oriented format, such as XML or JSON? >>>>> > >> I'd like to do this without re-inventing the wheel...anyone care to >>>>> > >> share? Thanks! >>>>> > >> >>>>> > >> Diana >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> If you reply to this email, your message will be added to the >>>>> > >> discussion below: >>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html >>>>> > >> To start a new topic under Apache Spark User List, email [hidden >>>>> > >> email] >>>>> > >> To unsubscribe from Apache Spark User List, click here. >>>>> > >> NAML >>>>> > >> >>>>> > >> >>>>> > >> View this message in context: Re: example of non-line oriented input >>>>> > >> data? >>>>> > >> Sent from the Apache Spark User List mailing list archive at >>>>> > >> Nabble.com. >>>>> > >> >>>>> > > >>>>> > > >>>>> > >>>>> > >>>>> >>>>> >>>>> >>>> >>> >>> >> >> >