Re: PySpark, numpy arrays and binary data

2014-08-07 Thread Rok Roskar
thanks for the quick answer!

 numpy array only can support basic types, so we can not use it during 
 collect()
 by default.
 

sure, but if you knew that a numpy array went in on one end, you could safely 
use it on the other end, no? Perhaps it would require an extension of the RDD 
class and overriding the colect() method. 

 Could you give a short example about how numpy array is used in your project?
 

sure -- basically our main data structure is a container class (acts like a 
dictionary) that holds various arrays that represent particle data. Each 
particle has various properties, position, velocity, mass etc. you get at these 
individual properties by calling something like 

s['pos']

where 's' is the container object and 'pos' is the name of the array. A really 
common use case then is to select particles based on their properties and do 
some plotting, or take a slice of the particles, e.g. you might do 

r = np.sqrt((s['pos']**2).sum(axis=1))
ind = np.where(r  5)
plot(s[ind]['x'], s[ind]['y'])

Internally, the various arrays are kept in a dictionary -- I'm hoping to write 
a class that keeps them in an RDD instead. To the user, this would have to be 
transparent, i.e. if the user wants to get at the data for specific particles, 
she would just have to do 

s['pos'][1,5,10] 

for example, and the data would be fetched for her from the RDD just like it 
would be if she were simply using the usual single-machine version. This is why 
the writing to/from files when retrieving data from the RDD really is a no-go 
-- can you recommend how this can be circumvented? 


 
 * is there a preferred way to read binary data off a local disk directly
 into an RDD? Our I/O routines are built to read data in chunks and each
 chunk could be read by a different process/RDD, but it's not clear to me how
 to accomplish this with the existing API. Since the idea is to process data
 sets that don't fit into a single node's memory, reading first and then
 distributing via sc.parallelize is obviously not an option.
 
 If you already know how to partition the data, then you could use
 sc.parallelize()
 to distribute the description of your data, then read the data in parallel by
 given descriptions.
 
 For examples, you can partition your data into (path, start, length), then
 
 partitions = [(path1, start1, length), (path1, start2, length), ...]
 
 def read_chunk(path, start, length):
  f = open(path)
  f.seek(start)
  data = f.read(length)
  #processing the data
 
 rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk)
 


right... this is totally obvious in retrospect!  Thanks!


Rok




 * related to the first question -- when an RDD is created by parallelizing a
 numpy array, the array gets serialized and distributed. I see in the source
 that it actually gets written into a file first (!?) -- but surely the Py4J
 bottleneck for python array types (mentioned in the source comment) doesn't
 really apply to numpy arrays? Is it really necessary to dump the data onto
 disk first? Conversely, the collect() seems really slow and I suspect that
 this is due to the combination of disk I/O and python list creation. Are
 there any ways of getting around this if numpy arrays are being used?
 
 
 I'd be curious about any other best-practices tips anyone might have for
 running pyspark with numpy data...!
 
 Thanks!
 
 
 Rok
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark, numpy arrays and binary data

2014-08-07 Thread Davies Liu
On Thu, Aug 7, 2014 at 12:06 AM, Rok Roskar rokros...@gmail.com wrote:
 sure, but if you knew that a numpy array went in on one end, you could safely 
 use it on the other end, no? Perhaps it would require an extension of the RDD 
 class and overriding the colect() method.

 Could you give a short example about how numpy array is used in your project?


 sure -- basically our main data structure is a container class (acts like a 
 dictionary) that holds various arrays that represent particle data. Each 
 particle has various properties, position, velocity, mass etc. you get at 
 these individual properties by calling something like

 s['pos']

 where 's' is the container object and 'pos' is the name of the array. A 
 really common use case then is to select particles based on their properties 
 and do some plotting, or take a slice of the particles, e.g. you might do

 r = np.sqrt((s['pos']**2).sum(axis=1))
 ind = np.where(r  5)
 plot(s[ind]['x'], s[ind]['y'])

 Internally, the various arrays are kept in a dictionary -- I'm hoping to 
 write a class that keeps them in an RDD instead. To the user, this would have 
 to be transparent, i.e. if the user wants to get at the data for specific 
 particles, she would just have to do

 s['pos'][1,5,10]

 for example, and the data would be fetched for her from the RDD just like it 
 would be if she were simply using the usual single-machine version. This is 
 why the writing to/from files when retrieving data from the RDD really is a 
 no-go -- can you recommend how this can be circumvented?

RDD is expected as distributed, so accessing the items in RDD by key
or indices directly will not be easy. So I think you can not mapping
this interface to an RDD, or the result will be what user expected,
such as very very slow.

In order to parallelize the computation, most of them should be done
by transformation of RDDs. Finally, fetch the data from RDD by
collect(), then do the plotting stuff. Can this kind of work flow work
for you cases?

Davies

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark, numpy arrays and binary data

2014-08-06 Thread Rok Roskar
Hello,

I'm interested in getting started with Spark to scale our scientific analysis 
package (http://pynbody.github.io) to larger data sets. The package is written 
in Python and makes heavy use of numpy/scipy and related frameworks. I've got a 
couple of questions that I have not been able to find easy answers to despite 
some research efforts... I hope someone here can clarify things for me a bit!

* is there a preferred way to read binary data off a local disk directly into 
an RDD? Our I/O routines are built to read data in chunks and each chunk could 
be read by a different process/RDD, but it's not clear to me how to accomplish 
this with the existing API. Since the idea is to process data sets that don't 
fit into a single node's memory, reading first and then distributing via 
sc.parallelize is obviously not an option. 

* related to the first question -- when an RDD is created by parallelizing a 
numpy array, the array gets serialized and distributed. I see in the source 
that it actually gets written into a file first (!?) -- but surely the Py4J 
bottleneck for python array types (mentioned in the source comment) doesn't 
really apply to numpy arrays? Is it really necessary to dump the data onto disk 
first? Conversely, the collect() seems really slow and I suspect that this is 
due to the combination of disk I/O and python list creation. Are there any ways 
of getting around this if numpy arrays are being used? 


I'd be curious about any other best-practices tips anyone might have for 
running pyspark with numpy data...! 

Thanks!


Rok



Re: PySpark, numpy arrays and binary data

2014-08-06 Thread Davies Liu
numpy array only can support basic types, so we can not use it during collect()
by default.

Could you give a short example about how numpy array is used in your project?

On Wed, Aug 6, 2014 at 8:41 AM, Rok Roskar rokros...@gmail.com wrote:
 Hello,

 I'm interested in getting started with Spark to scale our scientific
 analysis package (http://pynbody.github.io) to larger data sets. The package
 is written in Python and makes heavy use of numpy/scipy and related
 frameworks. I've got a couple of questions that I have not been able to find
 easy answers to despite some research efforts... I hope someone here can
 clarify things for me a bit!

 * is there a preferred way to read binary data off a local disk directly
 into an RDD? Our I/O routines are built to read data in chunks and each
 chunk could be read by a different process/RDD, but it's not clear to me how
 to accomplish this with the existing API. Since the idea is to process data
 sets that don't fit into a single node's memory, reading first and then
 distributing via sc.parallelize is obviously not an option.

If you already know how to partition the data, then you could use
sc.parallelize()
to distribute the description of your data, then read the data in parallel by
given descriptions.

For examples, you can partition your data into (path, start, length), then

partitions = [(path1, start1, length), (path1, start2, length), ...]

def read_chunk(path, start, length):
  f = open(path)
  f.seek(start)
  data = f.read(length)
  #processing the data

rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk)

 * related to the first question -- when an RDD is created by parallelizing a
 numpy array, the array gets serialized and distributed. I see in the source
 that it actually gets written into a file first (!?) -- but surely the Py4J
 bottleneck for python array types (mentioned in the source comment) doesn't
 really apply to numpy arrays? Is it really necessary to dump the data onto
 disk first? Conversely, the collect() seems really slow and I suspect that
 this is due to the combination of disk I/O and python list creation. Are
 there any ways of getting around this if numpy arrays are being used?


 I'd be curious about any other best-practices tips anyone might have for
 running pyspark with numpy data...!

 Thanks!


 Rok


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org