FYI, one thing we’ve added now is support for reading multiple text files from 
a directory as separate records: https://github.com/apache/spark/pull/327. This 
should remove the need for mapPartitions discussed here.

Avro and SequenceFiles look like they may not make it for 1.0, but there’s a 
chance that Parquet support with Spark SQL will, which should let you store 
binary data a bit better.

Matei

On Mar 19, 2014, at 3:12 PM, Jeremy Freeman <freeman.jer...@gmail.com> wrote:

> Another vote on this, support for simple SequenceFiles and/or Avro would be 
> terrific, as using plain text can be very space-inefficient, especially for 
> numerical data.
> 
> -- Jeremy
> 
> On Mar 19, 2014, at 5:24 PM, Nicholas Chammas <nicholas.cham...@gmail.com> 
> wrote:
> 
>> I'd second the request for Avro support in Python first, followed by Parquet.
>> 
>> 
>> On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin <itparan...@gmail.com> 
>> wrote:
>> 
>> On 19 Mar 2014, at 19:54, Diana Carroll <dcarr...@cloudera.com> wrote:
>> 
>>> Actually, thinking more on this question, Matei: I'd definitely say support 
>>> for Avro.  There's a lot of interest in this!!
>>> 
>> 
>> Agree, and parquet as default Cloudera Impala format.
>> 
>> 
>> 
>> 
>>> On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>>> wrote:
>>> BTW one other thing — in your experience, Diana, which non-text 
>>> InputFormats would be most useful to support in Python first? Would it be 
>>> Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or 
>>> something else? I think a per-file text input format that does the stuff we 
>>> did here would also be good.
>>> 
>>> Matei
>>> 
>>> 
>>> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote:
>>> 
>>>> Hi Diana,
>>>> 
>>>> This seems to work without the iter() in front if you just return 
>>>> treeiterator. What happened when you didn’t include that? Treeiterator 
>>>> should return an iterator.
>>>> 
>>>> Anyway, this is a good example of mapPartitions. It’s one where you want 
>>>> to view the whole file as one object (one XML here), so you couldn’t 
>>>> implement this using a flatMap, but you still want to return multiple 
>>>> values. The MLlib example you saw needs Python 2.7 because unfortunately 
>>>> that is a requirement for our Python MLlib support (see 
>>>> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
>>>>  We’d like to relax this later but we’re using some newer features of 
>>>> NumPy and Python. The rest of PySpark works on 2.6.
>>>> 
>>>> In terms of the size in memory, here both the string s and the XML tree 
>>>> constructed from it need to fit in, so you can’t work on very large 
>>>> individual XML files. You may be able to use a streaming XML parser 
>>>> instead to extract elements from the data in a streaming fashion, without 
>>>> every materializing the whole tree. 
>>>> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader
>>>>  is one example.
>>>> 
>>>> Matei
>>>> 
>>>> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>>>> 
>>>>> Well, if anyone is still following this, I've gotten the following code 
>>>>> working which in theory should allow me to parse whole XML files: (the 
>>>>> problem was that I can't return the tree iterator directly.  I have to 
>>>>> call iter().  Why?)
>>>>> 
>>>>> import xml.etree.ElementTree as ET
>>>>> 
>>>>> # two source files, format <data> <country 
>>>>> name="...">...</country>...</data>
>>>>> mydata=sc.textFile("file:/home/training/countries*.xml") 
>>>>> 
>>>>> def parsefile(iterator):
>>>>>     s = ''
>>>>>     for i in iterator: s = s + str(i)
>>>>>     tree = ET.fromstring(s)
>>>>>     treeiterator = tree.getiterator("country")
>>>>>     # why to I have to convert an iterator to an iterator?  not sure but 
>>>>> required
>>>>>     return iter(treeiterator)
>>>>> 
>>>>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: 
>>>>> element.attrib).collect()
>>>>> 
>>>>> The output is what I expect:
>>>>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>>>>> 
>>>>> BUT I'm a bit concerned about the construction of the string "s".  How 
>>>>> big can my file be before converting it to a string becomes problematic?
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dcarr...@cloudera.com> 
>>>>> wrote:
>>>>> Thanks, Matei.
>>>>> 
>>>>> In the context of this discussion, it would seem mapParitions is 
>>>>> essential, because it's the only way I'm going to be able to process each 
>>>>> file as a whole, in our example of a large number of small XML files 
>>>>> which need to be parsed as a whole file because records are not required 
>>>>> to be on a single line.
>>>>> 
>>>>> The theory makes sense but I'm still utterly lost as to how to implement 
>>>>> it.  Unfortunately there's only a single example of the use of 
>>>>> mapPartitions in any of the Python example programs, which is the log 
>>>>> regression example, which I can't run because it requires Python 2.7 and 
>>>>> I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 
>>>>> is unsupported...is it?)
>>>>> 
>>>>> I'd really really love to see a real life example of a Python use of 
>>>>> mapPartitions.  I do appreciate the very simple examples you provided, 
>>>>> but (perhaps because of my novice status on Python) I can't figure out 
>>>>> how to translate those to a real world situation in which I'm building 
>>>>> RDDs from files, not inline collections like [(1,2),(2,3)].
>>>>> 
>>>>> Also, you say that the function called in mapPartitions can return a 
>>>>> collection OR an iterator.  I tried returning an iterator by calling 
>>>>> ElementTree getiterator function, but still got the error telling me my 
>>>>> object was not an iterator. 
>>>>> 
>>>>> If anyone has a real life example of mapPartitions returning a Python 
>>>>> iterator, that would be fabulous.
>>>>> 
>>>>> Diana
>>>>> 
>>>>> 
>>>>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>>>>> wrote:
>>>>> Oh, I see, the problem is that the function you pass to mapPartitions 
>>>>> must itself return an iterator or a collection. This is used so that you 
>>>>> can return multiple output records for each input record. You can 
>>>>> implement most of the existing map-like operations in Spark, such as map, 
>>>>> filter, flatMap, etc, with mapPartitions, as well as new ones that might 
>>>>> do a sliding window over each partition for example, or accumulate data 
>>>>> across elements (e.g. to compute a sum).
>>>>> 
>>>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this 
>>>>> will work:
>>>>> 
>>>>> >>> data.mapPartitions(lambda x: x).collect()
>>>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>>>> 
>>>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a 
>>>>> single list (like glom)
>>>>> 
>>>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>>>> [3, 7]   # Sum each partition separately
>>>>> 
>>>>> However something like data.mapPartitions(lambda x: sum(x)).collect() 
>>>>> will *not* work because sum returns a number, not an iterator. That’s why 
>>>>> I put sum(x) inside a list above.
>>>>> 
>>>>> In practice mapPartitions is most useful if you want to share some data 
>>>>> or work across the elements. For example maybe you want to load a lookup 
>>>>> table once from an external file and then check each element in it, or 
>>>>> sum up a bunch of elements without allocating a lot of vector objects.
>>>>> 
>>>>> Matei
>>>>> 
>>>>> 
>>>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>>>>> 
>>>>> > "There’s also mapPartitions, which gives you an iterator for each 
>>>>> > partition instead of an array. You can then return an iterator or list 
>>>>> > of objects to produce from that."
>>>>> >
>>>>> > I confess, I was hoping for an example of just that, because i've not 
>>>>> > yet been able to figure out how to use mapPartitions.  No doubt this is 
>>>>> > because i'm a rank newcomer to Python, and haven't fully wrapped my 
>>>>> > head around iterators.  All I get so far in my attempts to use 
>>>>> > mapPartitions is the darned "suchnsuch is not an iterator" error.
>>>>> >
>>>>> > def myfunction(iterator): return [1,2,3]
>>>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia 
>>>>> > <matei.zaha...@gmail.com> wrote:
>>>>> > Here’s an example of getting together all lines in a file as one string:
>>>>> >
>>>>> > $ cat dir/a.txt
>>>>> > Hello
>>>>> > world!
>>>>> >
>>>>> > $ cat dir/b.txt
>>>>> > What's
>>>>> > up??
>>>>> >
>>>>> > $ bin/pyspark
>>>>> > >>> files = sc.textFile(“dir”)
>>>>> >
>>>>> > >>> files.collect()
>>>>> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not 
>>>>> > what we want
>>>>> >
>>>>> > >>> files.glom().collect()
>>>>> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, 
>>>>> > which is an array of lines
>>>>> >
>>>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>>>> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a 
>>>>> > single string
>>>>> >
>>>>> > The glom() method groups all the elements of each partition of an RDD 
>>>>> > into an array, giving you an RDD of arrays of objects. If your input is 
>>>>> > small files, you always have one partition per file.
>>>>> >
>>>>> > There’s also mapPartitions, which gives you an iterator for each 
>>>>> > partition instead of an array. You can then return an iterator or list 
>>>>> > of objects to produce from that.
>>>>> >
>>>>> > Matei
>>>>> >
>>>>> >
>>>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com> 
>>>>> > wrote:
>>>>> >
>>>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many 
>>>>> > > smallish XML files, so using mapPartitions that way would make sense. 
>>>>> > >  I'd love to see a code example though ...It's not as obvious to me 
>>>>> > > how to do that as I probably should be.
>>>>> > >
>>>>> > > Thanks,
>>>>> > > Diana
>>>>> > >
>>>>> > >
>>>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia 
>>>>> > > <matei.zaha...@gmail.com> wrote:
>>>>> > > Hi Diana,
>>>>> > >
>>>>> > > Non-text input formats are only supported in Java and Scala right 
>>>>> > > now, where you can use sparkContext.hadoopFile or .hadoopDataset to 
>>>>> > > load data with any InputFormat that Hadoop MapReduce supports. In 
>>>>> > > Python, you unfortunately only have textFile, which gives you one 
>>>>> > > record per line. For JSON, you’d have to fit the whole JSON object on 
>>>>> > > one line as you said. Hopefully we’ll also have some other forms of 
>>>>> > > input soon.
>>>>> > >
>>>>> > > If your input is a collection of separate files (say many .xml 
>>>>> > > files), you can also use mapPartitions on it to group together the 
>>>>> > > lines because each input file will end up being a single dataset 
>>>>> > > partition (or map task). This will let you concatenate the lines in 
>>>>> > > each file and parse them as one XML object.
>>>>> > >
>>>>> > > Matei
>>>>> > >
>>>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com> 
>>>>> > > wrote:
>>>>> > >
>>>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks 
>>>>> > >> like you are assuming that each line in foo.log contains a complete 
>>>>> > >> json object?  (That is, that the data doesn't contain any records 
>>>>> > >> that are split into multiple lines.)  If so, is that because you 
>>>>> > >> know that to be true of your data?  Or did you do as Nicholas 
>>>>> > >> suggests and have some preprocessing on the text input to flatten 
>>>>> > >> the data in that way?
>>>>> > >>
>>>>> > >> Thanks,
>>>>> > >> Diana
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com> 
>>>>> > >> wrote:
>>>>> > >> Katrina,
>>>>> > >>
>>>>> > >> Not sure if this is what you had in mind, but here's some simple 
>>>>> > >> pyspark code that I recently wrote to deal with JSON files.
>>>>> > >>
>>>>> > >> from pyspark import SparkContext, SparkConf
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> from operator import add
>>>>> > >> import json
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> import random
>>>>> > >> import numpy as np
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> def concatenate_paragraphs(sentence_array):
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> return ' '.join(sentence_array).split(' ')
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> logFile = 'foo.json'
>>>>> > >> conf = SparkConf()
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
>>>>> > >>  "1g")
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> sc = SparkContext(conf=conf)
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> logData = sc.textFile(logFile).cache()
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> num_lines = logData.count()
>>>>> > >> print 'Number of lines: %d' % num_lines
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, 
>>>>> > >> sentence2, ...]}}
>>>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], 
>>>>> > >> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> op = tm.collect()
>>>>> > >> for key, num_words in op:
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark 
>>>>> > >> User List] <[hidden email]> wrote:
>>>>> > >> I don't actually have any data.  I'm writing a course that teaches 
>>>>> > >> students how to do this sort of thing and am interested in looking 
>>>>> > >> at a variety of real life examples of people doing things like that. 
>>>>> > >>  I'd love to see some working code implementing the "obvious 
>>>>> > >> work-around" you mention...do you have any to share?  It's an 
>>>>> > >> approach that makes a lot of sense, and as I said, I'd love to not 
>>>>> > >> have to re-invent the wheel if someone else has already written that 
>>>>> > >> code.  Thanks!
>>>>> > >>
>>>>> > >> Diana
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> 
>>>>> > >> wrote:
>>>>> > >> There was a previous discussion about this here:
>>>>> > >>
>>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>>> > >>
>>>>> > >> How big are the XML or JSON files you're looking to deal with?
>>>>> > >>
>>>>> > >> It may not be practical to deserialize the entire document at once. 
>>>>> > >> In that case an obvious work-around would be to have some kind of 
>>>>> > >> pre-processing step that separates XML nodes/JSON objects with 
>>>>> > >> newlines so that you can analyze the data with Spark in a 
>>>>> > >> "line-oriented format". Your preprocessor wouldn't have to 
>>>>> > >> parse/deserialize the massive document; it would just have to track 
>>>>> > >> open/closed tags/braces to know when to insert a newline.
>>>>> > >>
>>>>> > >> Then you'd just open the line-delimited result and deserialize the 
>>>>> > >> individual objects/nodes with map().
>>>>> > >>
>>>>> > >> Nick
>>>>> > >>
>>>>> > >>
>>>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> 
>>>>> > >> wrote:
>>>>> > >> Has anyone got a working example of a Spark application that 
>>>>> > >> analyzes data in a non-line-oriented format, such as XML or JSON?  
>>>>> > >> I'd like to do this without re-inventing the wheel...anyone care to 
>>>>> > >> share?  Thanks!
>>>>> > >>
>>>>> > >> Diana
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> If you reply to this email, your message will be added to the 
>>>>> > >> discussion below:
>>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>>>> > >> To start a new topic under Apache Spark User List, email [hidden 
>>>>> > >> email]
>>>>> > >> To unsubscribe from Apache Spark User List, click here.
>>>>> > >> NAML
>>>>> > >>
>>>>> > >>
>>>>> > >> View this message in context: Re: example of non-line oriented input 
>>>>> > >> data?
>>>>> > >> Sent from the Apache Spark User List mailing list archive at 
>>>>> > >> Nabble.com.
>>>>> > >>
>>>>> > >
>>>>> > >
>>>>> >
>>>>> >
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to