Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)

2015-01-25 Thread Denis Mikhalkin
Hi Nicholas,
thanks for your reply. I checked spark-redshift - it's just for the unload data 
files stored on hadoop, not for online result sets from DB.
Do you know of any example of a custom RDD which fetches the data on the fly 
(not reading from HDFS)?
Thanks.
Denis
  From: Nicholas Chammas nicholas.cham...@gmail.com
 To: Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org 
user@spark.apache.org 
 Sent: Sunday, 25 January 2015, 3:06
 Subject: Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
   
I believe databricks provides an rdd interface to redshift. Did you check 
spark-packages.org?
On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid 
wrote:

Hello,

we've got some analytics data in AWS Redshift. The data is being constantly 
updated.
I'd like to be able to write a query against Redshift which would return a 
subset of data, and then run a Spark job (Pyspark) to do some analysis.
I could not find an RDD which would let me do it OOB (Python), so I tried 
writing my own. For example, tried combination of a generator (via yield) with 
parallelize. It appears though that parallelize reads all the data first into 
memory as I get either OOM or Python swaps as soon as I increase the number of 
rows beyond trivial limits.
I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems 
that it also reads all the data into memory.
So my question is - how to correctly feed Spark with huge datasets which don't 
initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any 
tips)?
Thanks.
Denis

   


  

Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)

2015-01-25 Thread Charles Feduke
I've got my solution working:

https://gist.github.com/cfeduke/3bca88ed793ddf20ea6d

I couldn't actually perform the steps I outlined in the previous message in
this thread because I would ultimately be trying to serialize a
SparkContext to the workers to use during the generation of 1..*n* JdbcRDDs.
So I took a look at the source for JdbcRDD and it was trivial to adjust to
my needs.

This got me thinking about your problem; the JdbcRDD that ships with Spark
will shard the query across the cluster by a Long ID value (requiring you
to put ? placeholders in your query for use as part of a range boundary) so
if you've got such a key - or any series field that happens to be a Long -
then you'd just need to use the PostgreSQL JDBC driver and get your JDBC
URL:
http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html

If you have something other than Long for your primary key/series data type
then you can do the same thing I did and modify a copy of JdbcRDD, though
your changes would be even fewer than my own. (Though I can't see anything
much different than a Long or date/time working for this since it has to
partition the full range into appropriate sub-ranges.)

Because of the sub-range bucketing and cluster distribution you shouldn't
run into OOM errors, assuming you provision sufficient worker nodes in the
cluster.

On Sun Jan 25 2015 at 9:39:56 AM Charles Feduke charles.fed...@gmail.com
wrote:

 I'm facing a similar problem except my data is already pre-sharded in
 PostgreSQL.

 I'm going to attempt to solve it like this:

 - Submit the shard names (database names) across the Spark cluster as a
 text file and partition it so workers get 0 or more - hopefully 1 - shard
 name. In this case you could partition ranges - if your primary key is a
 datetime, then a start/end datetime pair; or if its a long then a start/end
 long pair. (You may need to run a separate job to get your overall
 start/end pair and then calculate how many partitions you need from there.)

 - Write the job so that the worker loads data from its shard(s) and unions
 the RDDs together. In the case of pairs the concept is the same. Basically
 look at how the JdbcRDD constructor requires a start, end, and query
 (disregard numPartitions in this case since we're manually partitioning in
 the step above). Your query will be its initial filter conditions plus a
 between condition for the primary key and its pair.

 - Operate on the union RDDs with other transformations or filters.

 If everything works as planned then the data should be spread out across
 the cluster and no one node will be responsible for loading TiBs of data
 and then distributing it to its peers. That should help with your OOM
 problem.

 Of course this does not guarantee that the data is balanced across nodes.
 With a large amount of data it should balance well enough to get the job
 done though.

 (You may need to run several refinements against the complete dataset to
 figure out the appropriate start/end pair values to get an RDD that is
 partitioned and balanced across the workers. This is a task best performed
 using aggregate query logic or stored procedures. With my shard problem I
 don't have this option available.)

 Unless someone has a better idea, in which case I'd love to hear it.


 On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid
 wrote:

 Hi Nicholas,

 thanks for your reply. I checked spark-redshift - it's just for the
 unload data files stored on hadoop, not for online result sets from DB.

 Do you know of any example of a custom RDD which fetches the data on the
 fly (not reading from HDFS)?

 Thanks.

 Denis

   --
  *From:* Nicholas Chammas nicholas.cham...@gmail.com
 *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org 
 user@spark.apache.org
 *Sent:* Sunday, 25 January 2015, 3:06
 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS
 Redshift)

 I believe databricks provides an rdd interface to redshift. Did you check
 spark-packages.org?
 On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid
 wrote:

 Hello,

 we've got some analytics data in AWS Redshift. The data is being
 constantly updated.

 I'd like to be able to write a query against Redshift which would return
 a subset of data, and then run a Spark job (Pyspark) to do some analysis.

 I could not find an RDD which would let me do it OOB (Python), so I tried
 writing my own. For example, tried combination of a generator (via yield)
 with parallelize. It appears though that parallelize reads all the data
 first into memory as I get either OOM or Python swaps as soon as I increase
 the number of rows beyond trivial limits.

 I've also looked at Java RDDs (there is an example of MySQL RDD) but it
 seems that it also reads all the data into memory.

 So my question is - how to correctly feed Spark with huge datasets which
 don't initially reside in HDFS/S3 (ideally

Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)

2015-01-25 Thread Charles Feduke
I'm facing a similar problem except my data is already pre-sharded in
PostgreSQL.

I'm going to attempt to solve it like this:

- Submit the shard names (database names) across the Spark cluster as a
text file and partition it so workers get 0 or more - hopefully 1 - shard
name. In this case you could partition ranges - if your primary key is a
datetime, then a start/end datetime pair; or if its a long then a start/end
long pair. (You may need to run a separate job to get your overall
start/end pair and then calculate how many partitions you need from there.)

- Write the job so that the worker loads data from its shard(s) and unions
the RDDs together. In the case of pairs the concept is the same. Basically
look at how the JdbcRDD constructor requires a start, end, and query
(disregard numPartitions in this case since we're manually partitioning in
the step above). Your query will be its initial filter conditions plus a
between condition for the primary key and its pair.

- Operate on the union RDDs with other transformations or filters.

If everything works as planned then the data should be spread out across
the cluster and no one node will be responsible for loading TiBs of data
and then distributing it to its peers. That should help with your OOM
problem.

Of course this does not guarantee that the data is balanced across nodes.
With a large amount of data it should balance well enough to get the job
done though.

(You may need to run several refinements against the complete dataset to
figure out the appropriate start/end pair values to get an RDD that is
partitioned and balanced across the workers. This is a task best performed
using aggregate query logic or stored procedures. With my shard problem I
don't have this option available.)

Unless someone has a better idea, in which case I'd love to hear it.


On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid
wrote:

 Hi Nicholas,

 thanks for your reply. I checked spark-redshift - it's just for the unload
 data files stored on hadoop, not for online result sets from DB.

 Do you know of any example of a custom RDD which fetches the data on the
 fly (not reading from HDFS)?

 Thanks.

 Denis

   --
  *From:* Nicholas Chammas nicholas.cham...@gmail.com
 *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org 
 user@spark.apache.org
 *Sent:* Sunday, 25 January 2015, 3:06
 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS
 Redshift)

 I believe databricks provides an rdd interface to redshift. Did you check
 spark-packages.org?
 On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid
 wrote:

 Hello,

 we've got some analytics data in AWS Redshift. The data is being
 constantly updated.

 I'd like to be able to write a query against Redshift which would return a
 subset of data, and then run a Spark job (Pyspark) to do some analysis.

 I could not find an RDD which would let me do it OOB (Python), so I tried
 writing my own. For example, tried combination of a generator (via yield)
 with parallelize. It appears though that parallelize reads all the data
 first into memory as I get either OOM or Python swaps as soon as I increase
 the number of rows beyond trivial limits.

 I've also looked at Java RDDs (there is an example of MySQL RDD) but it
 seems that it also reads all the data into memory.

 So my question is - how to correctly feed Spark with huge datasets which
 don't initially reside in HDFS/S3 (ideally for Pyspark, but would
 appreciate any tips)?

 Thanks.

 Denis







Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)

2015-01-24 Thread Nicholas Chammas
I believe databricks provides an rdd interface to redshift. Did you check
spark-packages.org?
On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid
wrote:

 Hello,

 we've got some analytics data in AWS Redshift. The data is being
 constantly updated.

 I'd like to be able to write a query against Redshift which would return a
 subset of data, and then run a Spark job (Pyspark) to do some analysis.

 I could not find an RDD which would let me do it OOB (Python), so I tried
 writing my own. For example, tried combination of a generator (via yield)
 with parallelize. It appears though that parallelize reads all the data
 first into memory as I get either OOM or Python swaps as soon as I increase
 the number of rows beyond trivial limits.

 I've also looked at Java RDDs (there is an example of MySQL RDD) but it
 seems that it also reads all the data into memory.

 So my question is - how to correctly feed Spark with huge datasets which
 don't initially reside in HDFS/S3 (ideally for Pyspark, but would
 appreciate any tips)?

 Thanks.

 Denis





Analyzing data from non-standard data sources (e.g. AWS Redshift)

2015-01-24 Thread Denis Mikhalkin
Hello,

we've got some analytics data in AWS Redshift. The data is being constantly 
updated.
I'd like to be able to write a query against Redshift which would return a 
subset of data, and then run a Spark job (Pyspark) to do some analysis.
I could not find an RDD which would let me do it OOB (Python), so I tried 
writing my own. For example, tried combination of a generator (via yield) with 
parallelize. It appears though that parallelize reads all the data first into 
memory as I get either OOM or Python swaps as soon as I increase the number of 
rows beyond trivial limits.
I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems 
that it also reads all the data into memory.
So my question is - how to correctly feed Spark with huge datasets which don't 
initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any 
tips)?
Thanks.
Denis