I've got my solution working: https://gist.github.com/cfeduke/3bca88ed793ddf20ea6d
I couldn't actually perform the steps I outlined in the previous message in this thread because I would ultimately be trying to serialize a SparkContext to the workers to use during the generation of 1..*n* JdbcRDDs. So I took a look at the source for JdbcRDD and it was trivial to adjust to my needs. This got me thinking about your problem; the JdbcRDD that ships with Spark will shard the query across the cluster by a Long ID value (requiring you to put ? placeholders in your query for use as part of a range boundary) so if you've got such a key - or any series field that happens to be a Long - then you'd just need to use the PostgreSQL JDBC driver and get your JDBC URL: http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html If you have something other than Long for your primary key/series data type then you can do the same thing I did and modify a copy of JdbcRDD, though your changes would be even fewer than my own. (Though I can't see anything much different than a Long or date/time working for this since it has to partition the full range into appropriate sub-ranges.) Because of the sub-range bucketing and cluster distribution you shouldn't run into OOM errors, assuming you provision sufficient worker nodes in the cluster. On Sun Jan 25 2015 at 9:39:56 AM Charles Feduke <charles.fed...@gmail.com> wrote: > I'm facing a similar problem except my data is already pre-sharded in > PostgreSQL. > > I'm going to attempt to solve it like this: > > - Submit the shard names (database names) across the Spark cluster as a > text file and partition it so workers get 0 or more - hopefully 1 - shard > name. In this case you could partition ranges - if your primary key is a > datetime, then a start/end datetime pair; or if its a long then a start/end > long pair. (You may need to run a separate job to get your overall > start/end pair and then calculate how many partitions you need from there.) > > - Write the job so that the worker loads data from its shard(s) and unions > the RDDs together. In the case of pairs the concept is the same. Basically > look at how the JdbcRDD constructor requires a start, end, and query > (disregard numPartitions in this case since we're manually partitioning in > the step above). Your query will be its initial filter conditions plus a > between condition for the primary key and its pair. > > - Operate on the union RDDs with other transformations or filters. > > If everything works as planned then the data should be spread out across > the cluster and no one node will be responsible for loading TiBs of data > and then distributing it to its peers. That should help with your OOM > problem. > > Of course this does not guarantee that the data is balanced across nodes. > With a large amount of data it should balance well enough to get the job > done though. > > (You may need to run several refinements against the complete dataset to > figure out the appropriate start/end pair values to get an RDD that is > partitioned and balanced across the workers. This is a task best performed > using aggregate query logic or stored procedures. With my shard problem I > don't have this option available.) > > Unless someone has a better idea, in which case I'd love to hear it. > > > On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin <deni...@yahoo.com.invalid> > wrote: > >> Hi Nicholas, >> >> thanks for your reply. I checked spark-redshift - it's just for the >> unload data files stored on hadoop, not for online result sets from DB. >> >> Do you know of any example of a custom RDD which fetches the data on the >> fly (not reading from HDFS)? >> >> Thanks. >> >> Denis >> >> ------------------------------ >> *From:* Nicholas Chammas <nicholas.cham...@gmail.com> >> *To:* Denis Mikhalkin <deni...@yahoo.com>; "user@spark.apache.org" < >> user@spark.apache.org> >> *Sent:* Sunday, 25 January 2015, 3:06 >> *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS >> Redshift) >> >> I believe databricks provides an rdd interface to redshift. Did you check >> spark-packages.org? >> On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin <deni...@yahoo.com.invalid> >> wrote: >> >> Hello, >> >> we've got some analytics data in AWS Redshift. The data is being >> constantly updated. >> >> I'd like to be able to write a query against Redshift which would return >> a subset of data, and then run a Spark job (Pyspark) to do some analysis. >> >> I could not find an RDD which would let me do it OOB (Python), so I tried >> writing my own. For example, tried combination of a generator (via yield) >> with parallelize. It appears though that "parallelize" reads all the data >> first into memory as I get either OOM or Python swaps as soon as I increase >> the number of rows beyond trivial limits. >> >> I've also looked at Java RDDs (there is an example of MySQL RDD) but it >> seems that it also reads all the data into memory. >> >> So my question is - how to correctly feed Spark with huge datasets which >> don't initially reside in HDFS/S3 (ideally for Pyspark, but would >> appreciate any tips)? >> >> Thanks. >> >> Denis >> >> >> >> >>