I've got my solution working:

https://gist.github.com/cfeduke/3bca88ed793ddf20ea6d

I couldn't actually perform the steps I outlined in the previous message in
this thread because I would ultimately be trying to serialize a
SparkContext to the workers to use during the generation of 1..*n* JdbcRDDs.
So I took a look at the source for JdbcRDD and it was trivial to adjust to
my needs.

This got me thinking about your problem; the JdbcRDD that ships with Spark
will shard the query across the cluster by a Long ID value (requiring you
to put ? placeholders in your query for use as part of a range boundary) so
if you've got such a key - or any series field that happens to be a Long -
then you'd just need to use the PostgreSQL JDBC driver and get your JDBC
URL:
http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html

If you have something other than Long for your primary key/series data type
then you can do the same thing I did and modify a copy of JdbcRDD, though
your changes would be even fewer than my own. (Though I can't see anything
much different than a Long or date/time working for this since it has to
partition the full range into appropriate sub-ranges.)

Because of the sub-range bucketing and cluster distribution you shouldn't
run into OOM errors, assuming you provision sufficient worker nodes in the
cluster.

On Sun Jan 25 2015 at 9:39:56 AM Charles Feduke <charles.fed...@gmail.com>
wrote:

> I'm facing a similar problem except my data is already pre-sharded in
> PostgreSQL.
>
> I'm going to attempt to solve it like this:
>
> - Submit the shard names (database names) across the Spark cluster as a
> text file and partition it so workers get 0 or more - hopefully 1 - shard
> name. In this case you could partition ranges - if your primary key is a
> datetime, then a start/end datetime pair; or if its a long then a start/end
> long pair. (You may need to run a separate job to get your overall
> start/end pair and then calculate how many partitions you need from there.)
>
> - Write the job so that the worker loads data from its shard(s) and unions
> the RDDs together. In the case of pairs the concept is the same. Basically
> look at how the JdbcRDD constructor requires a start, end, and query
> (disregard numPartitions in this case since we're manually partitioning in
> the step above). Your query will be its initial filter conditions plus a
> between condition for the primary key and its pair.
>
> - Operate on the union RDDs with other transformations or filters.
>
> If everything works as planned then the data should be spread out across
> the cluster and no one node will be responsible for loading TiBs of data
> and then distributing it to its peers. That should help with your OOM
> problem.
>
> Of course this does not guarantee that the data is balanced across nodes.
> With a large amount of data it should balance well enough to get the job
> done though.
>
> (You may need to run several refinements against the complete dataset to
> figure out the appropriate start/end pair values to get an RDD that is
> partitioned and balanced across the workers. This is a task best performed
> using aggregate query logic or stored procedures. With my shard problem I
> don't have this option available.)
>
> Unless someone has a better idea, in which case I'd love to hear it.
>
>
> On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin <deni...@yahoo.com.invalid>
> wrote:
>
>> Hi Nicholas,
>>
>> thanks for your reply. I checked spark-redshift - it's just for the
>> unload data files stored on hadoop, not for online result sets from DB.
>>
>> Do you know of any example of a custom RDD which fetches the data on the
>> fly (not reading from HDFS)?
>>
>> Thanks.
>>
>> Denis
>>
>>   ------------------------------
>>  *From:* Nicholas Chammas <nicholas.cham...@gmail.com>
>> *To:* Denis Mikhalkin <deni...@yahoo.com>; "user@spark.apache.org" <
>> user@spark.apache.org>
>> *Sent:* Sunday, 25 January 2015, 3:06
>> *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS
>> Redshift)
>>
>> I believe databricks provides an rdd interface to redshift. Did you check
>> spark-packages.org?
>> On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin <deni...@yahoo.com.invalid>
>> wrote:
>>
>> Hello,
>>
>> we've got some analytics data in AWS Redshift. The data is being
>> constantly updated.
>>
>> I'd like to be able to write a query against Redshift which would return
>> a subset of data, and then run a Spark job (Pyspark) to do some analysis.
>>
>> I could not find an RDD which would let me do it OOB (Python), so I tried
>> writing my own. For example, tried combination of a generator (via yield)
>> with parallelize. It appears though that "parallelize" reads all the data
>> first into memory as I get either OOM or Python swaps as soon as I increase
>> the number of rows beyond trivial limits.
>>
>> I've also looked at Java RDDs (there is an example of MySQL RDD) but it
>> seems that it also reads all the data into memory.
>>
>> So my question is - how to correctly feed Spark with huge datasets which
>> don't initially reside in HDFS/S3 (ideally for Pyspark, but would
>> appreciate any tips)?
>>
>> Thanks.
>>
>> Denis
>>
>>
>>
>>
>>

Reply via email to