Hi Nick, This is a nice start. I'd prefer to keep the Java sequenceFileAsText() and newHadoopFileAsText() methods inside PythonRDD instead of adding them to JavaSparkContext, since I think these methods are unlikely to be used directly by Java users (you can add these methods to the PythonRDD companion object, which is how readRDDFromPickleFile is implemented: https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255 )
For MsgPack, the UnpicklingError is because the Python worker expects to receive its input in a pickled format. In my prototype of custom serializers, I modified the PySpark worker to receive its serialization/deserialization function as input ( https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41) and added logic to pass the appropriate serializers based on each stage's input and output formats ( https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42 ). At some point, I'd like to port my custom serializers code to PySpark; if anyone's interested in helping, I'd be glad to write up some additional notes on how this should work. - Josh On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <[email protected]>wrote: > Thanks Josh, Patrick for the feedback. > > Based on Josh's pointers I have something working for JavaPairRDD -> > PySpark RDD[(String, String)]. This just calls the toString method on each > key and value as before, but without the need for a delimiter. For > SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls > toString to convert to Text for keys and values. We then call toString > (again) ourselves to get Strings to feed to writeAsPickle. > > Details here: https://gist.github.com/MLnick/7230588 > > This also illustrates where the "wrapper function" api would fit in. All > that is required is to define a T => String for key and value. > > I started playing around with MsgPack and can sort of get things to work in > Scala, but am struggling with getting the raw bytes to be written properly > in PythonRDD (I think it is treating them as pickled byte arrays when they > are not, but when I removed the 'stripPickle' calls and amended the length > (-6) I got "UnpicklingError: invalid load key, ' '. "). > > Another issue is that MsgPack does well at writing "structures" - like Java > classes with public fields that are fairly simple - but for example the > Writables have private fields so you end up with nothing being written. > This looks like it would require custom "Templates" (serialization > functions effectively) for many classes, which means a lot of custom code > for a user to write to use it. Fortunately for most of the common Writables > a toString does the job. Will keep looking into it though. > > Anyway, Josh if you have ideas or examples on the "Wrapper API from Python" > that you mentioned, I'd be interested to hear them. > > If you think this is worth working up as a Pull Request covering > SequenceFiles and custom InputFormats with default toString conversions and > the ability to specify Wrapper functions, I can clean things up more, add > some functionality and tests, and also test to see if common things like > the "normal" Writables and reading from things like HBase and Cassandra can > be made to work nicely (any other common use cases that you think make > sense?). > > Thoughts, comments etc welcome. > > Nick > > > > On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <[email protected] > >wrote: > > > As a starting point, a version where people just write their own > "wrapper" > > functions to convert various HadoopFiles into String <K, V> files could > go > > a long way. We could even have a few built-in versions, such as dealing > > with Sequence files that are <String, String>. Basically, the user needs > to > > write a translator in Java/Scala that produces textual records from > > whatever format that want. Then, they make sure this is included in the > > classpath when running PySpark. > > > > As Josh is saying, I'm pretty sure this is already possible, but we may > > want to document it for users. In many organizations they might have 1-2 > > people who can write the Java/Scala to do this but then many more people > > who are comfortable using python once it's setup. > > > > - Patrick > > > > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <[email protected]> > wrote: > > > > > Hi Nick, > > > > > > I've seen several requests for SequenceFile support in PySpark, so > > there's > > > definitely demand for this feature. > > > > > > I like the idea of passing MsgPack'ed data (or some other structured > > > format) from Java to the Python workers. My early prototype of custom > > > serializers (described at > > > > > > > > > https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers > > > ) > > > might be useful for implementing this. Proper custom serializer > support > > > would handle the bookkeeping for tracking each stage's input and output > > > formats and supplying the appropriate deserialization functions to the > > > Python worker, so the Python worker would be able to directly read the > > > MsgPack'd data that's sent to it. > > > > > > Regarding a wrapper API, it's actually possible to initially transform > > data > > > using Scala/Java and perform the remainder of the processing in > PySpark. > > > This involves adding the appropriate compiled to the Java classpath > and > > a > > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use by > > > PySpark. I can hack together a rough example of this if anyone's > > > interested, but it would need some work to be developed into a > > > user-friendly API. > > > > > > If you wanted to extend your proof-of-concept to handle the cases where > > > keys and values have parseable toString() values, I think you could > > remove > > > the need for a delimiter by creating a PythonRDD from the newHadoopFile > > > JavaPairRDD and adding a new method to writeAsPickle ( > > > > > > > > > https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224 > > > ) > > > to dump its contents as a pickled pair of strings. (Aside: most of > > > writeAsPickle() would probably need be eliminated or refactored when > > adding > > > general custom serializer support). > > > > > > - Josh > > > > > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath > > > <[email protected]>wrote: > > > > > > > Hi Spark Devs > > > > > > > > I was wondering what appetite there may be to add the ability for > > PySpark > > > > users to create RDDs from (somewhat) arbitrary Hadoop InputFormats. > > > > > > > > In my data pipeline for example, I'm currently just using Scala > (partly > > > > because I love it but also because I am heavily reliant on quite > custom > > > > Hadoop InputFormats for reading data). However, many users may prefer > > to > > > > use PySpark as much as possible (if not for everything). Reasons > might > > > > include the need to use some Python library. While I don't do it > yet, I > > > can > > > > certainly see an attractive use case for using say scikit-learn / > numpy > > > to > > > > do data analysis & machine learning in Python. Added to this my > > cofounder > > > > knows Python well but not Scala so it can be very beneficial to do a > > lot > > > of > > > > stuff in Python. > > > > > > > > For text-based data this is fine, but reading data in from more > complex > > > > Hadoop formats is an issue. > > > > > > > > The current approach would of course be to write an ETL-style > > Java/Scala > > > > job and then process in Python. Nothing wrong with this, but I was > > > thinking > > > > about ways to allow Python to access arbitrary Hadoop InputFormats. > > > > > > > > Here is a quick proof of concept: > > https://gist.github.com/MLnick/7150058 > > > > > > > > This works for simple stuff like SequenceFile with simple Writable > > > > key/values. > > > > > > > > To work with more complex files, perhaps an approach is to manipulate > > > > Hadoop JobConf via Python and pass that in. The one downside is of > > course > > > > that the InputFormat (well actually the Key/Value classes) must have > a > > > > toString that makes sense so very custom stuff might not work. > > > > > > > > I wonder if it would be possible to take the objects that are yielded > > via > > > > the InputFormat and convert them into some representation like > > ProtoBuf, > > > > MsgPack, Avro, JSON, that can be read relatively more easily from > > Python? > > > > > > > > Another approach could be to allow a simple "wrapper API" such that > one > > > can > > > > write a wrapper function T => String and pass that into an > > > > InputFormatWrapper that takes an arbitrary InputFormat and yields > > Strings > > > > for the keys and values. Then all that is required is to compile that > > > > function and add it to the SPARK_CLASSPATH and away you go! > > > > > > > > Thoughts? > > > > > > > > Nick > > > > > > > > > >
