What database are you using?
Le 28 févr. 2015 18:15, "Michal Klos" <[email protected]> a écrit :
> Hi Spark community,
>
> We have a use case where we need to pull huge amounts of data from a SQL
> query against a database into Spark. We need to execute the query against
> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
> couple of factors including custom functions used in the queries that only
> our database has.
>
> We started by looking at JDBC RDD, which utilizes a prepared statement
> with two parameters that are meant to be used to partition the result set
> to the workers... e.g.:
>
> select * from table limit ?,?
>
> turns into
>
> select * from table limit 1,100 on worker 1
> select * from table limit 101,200 on worker 2
>
> This will not work for us because our database cannot support multiple
> execution of these queries without being crippled. But, additionally, our
> database doesn't support the above LIMIT syntax and we don't have a generic
> way of partitioning the various queries.
>
> As a result -- we stated by forking JDBCRDD and made a version that
> executes the SQL query once in getPartitions into a Vector and then hands
> each worker node an index and iterator. Here's a snippet of getPartitions
> and compute:
>
> override def getPartitions: Array[Partition] = {
> //Compute the DB query once here
> val results = computeQuery
>
> (0 until numPartitions).map(i => {
> // TODO: would be better to do this partitioning when scrolling through
> result set if still loading into memory
> val partitionItems = results.drop(i).sliding(1,
> numPartitions).flatten.toVector
> new DBPartition(i, partitionItems)
> }).toArray
> }
>
> override def compute(thePart: Partition, context: TaskContext) = new
> NextIterator[T] {
> val part = thePart.asInstanceOf[DBPartition[T]]
>
> //Shift the result vector to our index number and then do a sliding
> iterator over it
> val iterator = part.items.iterator
>
> override def getNext : T = {
> if (iterator.hasNext) {
> iterator.next()
> } else {
> finished = true
> null.asInstanceOf[T]
> }
> }
>
> override def close: Unit = ()
> }
>
> This is a little better since we can just execute the query once. However,
> the result-set needs to fit in memory.
>
> We've been trying to brainstorm a way to
>
> A) have that result set distribute out to the worker RDD partitions as it's
> streaming in from the cursor?
> B) have the result set spill to disk if it exceeds memory and do something
> clever around the iterators?
> C) something else?
>
> We're not familiar enough yet with all of the workings of Spark to know how
> to proceed on this.
>
> We also thought of the worker-around of having the DB query dump to HDFS/S3
> and then pick it up for there, but it adds more moving parts and latency to
> our processing.
>
> Does anyone have a clever suggestion? Are we missing something?
>
> thanks,
> Michal
>
>