I'm a little confused by your comments regarding LIMIT.  There's nothing
about JdbcRDD that depends on limit.  You just need to be able to partition
your data in some way such that it has numeric upper and lower bounds.
Primary key range scans, not limit, would ordinarily be the best way to do
that.  If you can't partition your data that way for some reason, how do
you propose to partition it at all?


On Sat, Feb 28, 2015 at 11:13 AM, Michal Klos <michal.klo...@gmail.com>
wrote:

> Hi Spark community,
>
> We have a use case where we need to pull huge amounts of data from a SQL
> query against a database into Spark. We need to execute the query against
> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
> couple of factors including custom functions used in the queries that only
> our database has.
>
> We started by looking at JDBC RDD, which utilizes a prepared statement
> with two parameters that are meant to be used to partition the result set
> to the workers... e.g.:
>
> select * from table limit ?,?
>
> turns into
>
> select * from table limit 1,100 on worker 1
> select * from table limit 101,200 on worker 2
>
> This will not work for us because our database cannot support multiple
> execution of these queries without being crippled. But, additionally, our
> database doesn't support the above LIMIT syntax and we don't have a generic
> way of partitioning the various queries.
>
> As a result -- we stated by forking JDBCRDD and made a version that
> executes the SQL query once in getPartitions into a Vector and then hands
> each worker node an index and iterator. Here's a snippet of getPartitions
> and compute:
>
>   override def getPartitions: Array[Partition] = {
>     //Compute the DB query once here
>     val results = computeQuery
>
>     (0 until numPartitions).map(i => {
>       // TODO: would be better to do this partitioning when scrolling through 
> result set if still loading into memory
>       val partitionItems = results.drop(i).sliding(1, 
> numPartitions).flatten.toVector
>       new DBPartition(i, partitionItems)
>     }).toArray
>   }
>
>   override def compute(thePart: Partition, context: TaskContext) = new 
> NextIterator[T] {
>     val part = thePart.asInstanceOf[DBPartition[T]]
>
>     //Shift the result vector to our index number and then do a sliding 
> iterator over it
>     val iterator = part.items.iterator
>
>     override def getNext : T = {
>       if (iterator.hasNext) {
>         iterator.next()
>       } else {
>         finished = true
>         null.asInstanceOf[T]
>       }
>     }
>
>     override def close: Unit = ()
>   }
>
> This is a little better since we can just execute the query once. However, 
> the result-set needs to fit in memory.
>
> We've been trying to brainstorm a way to
>
> A) have that result set distribute out to the worker RDD partitions as it's 
> streaming in from the cursor?
> B) have the result set spill to disk if it exceeds memory and do something 
> clever around the iterators?
> C) something else?
>
> We're not familiar enough yet with all of the workings of Spark to know how 
> to proceed on this.
>
> We also thought of the worker-around of having the DB query dump to HDFS/S3 
> and then pick it up for there, but it adds more moving parts and latency to 
> our processing.
>
> Does anyone have a clever suggestion? Are we missing something?
>
> thanks,
> Michal
>
>

Reply via email to