Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis From: Nicholas Chammas nicholas.cham...@gmail.com To: Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org Sent: Sunday, 25 January 2015, 3:06 Subject: Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any tips)? Thanks. Denis
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
I've got my solution working: https://gist.github.com/cfeduke/3bca88ed793ddf20ea6d I couldn't actually perform the steps I outlined in the previous message in this thread because I would ultimately be trying to serialize a SparkContext to the workers to use during the generation of 1..*n* JdbcRDDs. So I took a look at the source for JdbcRDD and it was trivial to adjust to my needs. This got me thinking about your problem; the JdbcRDD that ships with Spark will shard the query across the cluster by a Long ID value (requiring you to put ? placeholders in your query for use as part of a range boundary) so if you've got such a key - or any series field that happens to be a Long - then you'd just need to use the PostgreSQL JDBC driver and get your JDBC URL: http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html If you have something other than Long for your primary key/series data type then you can do the same thing I did and modify a copy of JdbcRDD, though your changes would be even fewer than my own. (Though I can't see anything much different than a Long or date/time working for this since it has to partition the full range into appropriate sub-ranges.) Because of the sub-range bucketing and cluster distribution you shouldn't run into OOM errors, assuming you provision sufficient worker nodes in the cluster. On Sun Jan 25 2015 at 9:39:56 AM Charles Feduke charles.fed...@gmail.com wrote: I'm facing a similar problem except my data is already pre-sharded in PostgreSQL. I'm going to attempt to solve it like this: - Submit the shard names (database names) across the Spark cluster as a text file and partition it so workers get 0 or more - hopefully 1 - shard name. In this case you could partition ranges - if your primary key is a datetime, then a start/end datetime pair; or if its a long then a start/end long pair. (You may need to run a separate job to get your overall start/end pair and then calculate how many partitions you need from there.) - Write the job so that the worker loads data from its shard(s) and unions the RDDs together. In the case of pairs the concept is the same. Basically look at how the JdbcRDD constructor requires a start, end, and query (disregard numPartitions in this case since we're manually partitioning in the step above). Your query will be its initial filter conditions plus a between condition for the primary key and its pair. - Operate on the union RDDs with other transformations or filters. If everything works as planned then the data should be spread out across the cluster and no one node will be responsible for loading TiBs of data and then distributing it to its peers. That should help with your OOM problem. Of course this does not guarantee that the data is balanced across nodes. With a large amount of data it should balance well enough to get the job done though. (You may need to run several refinements against the complete dataset to figure out the appropriate start/end pair values to get an RDD that is partitioned and balanced across the workers. This is a task best performed using aggregate query logic or stored procedures. With my shard problem I don't have this option available.) Unless someone has a better idea, in which case I'd love to hear it. On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis -- *From:* Nicholas Chammas nicholas.cham...@gmail.com *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org *Sent:* Sunday, 25 January 2015, 3:06 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
I'm facing a similar problem except my data is already pre-sharded in PostgreSQL. I'm going to attempt to solve it like this: - Submit the shard names (database names) across the Spark cluster as a text file and partition it so workers get 0 or more - hopefully 1 - shard name. In this case you could partition ranges - if your primary key is a datetime, then a start/end datetime pair; or if its a long then a start/end long pair. (You may need to run a separate job to get your overall start/end pair and then calculate how many partitions you need from there.) - Write the job so that the worker loads data from its shard(s) and unions the RDDs together. In the case of pairs the concept is the same. Basically look at how the JdbcRDD constructor requires a start, end, and query (disregard numPartitions in this case since we're manually partitioning in the step above). Your query will be its initial filter conditions plus a between condition for the primary key and its pair. - Operate on the union RDDs with other transformations or filters. If everything works as planned then the data should be spread out across the cluster and no one node will be responsible for loading TiBs of data and then distributing it to its peers. That should help with your OOM problem. Of course this does not guarantee that the data is balanced across nodes. With a large amount of data it should balance well enough to get the job done though. (You may need to run several refinements against the complete dataset to figure out the appropriate start/end pair values to get an RDD that is partitioned and balanced across the workers. This is a task best performed using aggregate query logic or stored procedures. With my shard problem I don't have this option available.) Unless someone has a better idea, in which case I'd love to hear it. On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis -- *From:* Nicholas Chammas nicholas.cham...@gmail.com *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org *Sent:* Sunday, 25 January 2015, 3:06 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any tips)? Thanks. Denis
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any tips)? Thanks. Denis
Analyzing data from non-standard data sources (e.g. AWS Redshift)
Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any tips)? Thanks. Denis