[ 
https://issues.apache.org/jira/browse/SPARK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036116#comment-15036116
 ] 

Sean Owen commented on SPARK-12097:
-----------------------------------

Normally I'd say we don't use JIRA for discussion (i.e. we don't use the 
Brainstorming type and can't delete it), but since the writeup here is good, 
and I have some back-story on the use case here which makes me believe it 
_could_ lead to a change, let's leave it for the moment.

Why not simply query once per batch for all the rows you need and cache them? 
this need not involve anything specific to Spark. {{updateStateByKey}} is more 
for when you want Spark to maintain some persistent data, but you are already 
maintaining it (in your RDBMS). It's unnecessarily complex here.

> How to do a cached, batched JDBC-lookup in Spark Streaming?
> -----------------------------------------------------------
>
>                 Key: SPARK-12097
>                 URL: https://issues.apache.org/jira/browse/SPARK-12097
>             Project: Spark
>          Issue Type: Brainstorming
>          Components: Streaming
>            Reporter: Christian Kurz
>
> h3. Use-case
> I need to enrich incoming Kafka data with data from a lookup table (or query) 
> on a relational database. Lookup data is changing slowly over time (So 
> caching is okay for a certain retention time). Lookup data is potentially 
> huge (So loading all data upfront is not option).
> h3. Problem
> The overall design idea is to implement a cached and batched JDBC lookup. 
> That is, for any lookup keys, which are missing from the lookup cache, a JDBC 
> lookup is done to retrieve the missing lookup data. JDBC lookups are rather 
> expensive (connection overhead, number of round-trips) and therefore must be 
> done in batches. E.g. one JDBC lookup per 100 missing keys.
> So the high-level logic might look something like this:
> # For every Kafka RDD we extract all lookup keys
> # For all lookup keys we check whether the lookup data is already available 
> already in cache and whether this cached information has not expired, yet.
> # For any lookup keys not found in cache (or expired), we send batched 
> prepared JDBC Statements to the database to fetch the missing lookup data:
>     {{SELECT c1, c2, c3 FROM ... WHERE k1 in (?,?,?,...)}}
> to minimize the number of JDBC round-trips.
> # At this point we have up-to-date lookup data for all lookup keys and can 
> perform the actual lookup operation.
> Does this approach make sense on Spark? Would Spark State DStreams be the 
> right way to go? Or other design approaches?
> Assuming Spark State DStreams are the right direction, the low-level question 
> is how to do the batching?
> Would this particular signature of DStream.updateStateByKey ( iterator-> 
> iterator):
> {code:borderStyle=solid}
> def updateStateByKey[S: ClassTag](
>       updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
>       partitioner: Partitioner,
>       rememberPartitioner: Boolean,
>       initialRDD: RDD[(K, S)]
>     )
> {code}
> be the right way to batch multiple incoming keys into a single JDBC-lookup 
> query?
> Would the new {{DStream.trackStateByKey()}} be a better approach?
> The second more high-level question: is there a way to chain multiple state 
> operations on the same state object?
> Going with the above design approach the entire lookup logic would be 
> handcrafted into some java/scala/python {{updateFunc}}. This function would 
> go over all incoming keys, check which ones are missing from cache, batch the 
> missing ones, run the JDBC queries and union the returned lookup data with 
> the existing cache from the State object.
> The fact that all of this must be handcrafted into a single function seems to 
> be caused by the fact that Spark State processing logic on a high-level works 
> like this:
> {code:borderStyle=solid}
> input: prevStateRdd, inputRDD
> output: updateStateFunc( prevStateRdd, inputRdd )}}
> {code}
> So only a single updateStateFunc operating on prevStateRdd and inputRdd in 
> one go. Once done there is no way to further refine the State as part of the 
> current micro batch.
> The multi-step processing required here sounds like a typical use-case for a 
> DStream: apply multiple operations one after the other on some incoming data. 
> So I wonder whether there is a way to extend the concept of state processing 
> (may be it already has been extended?) to do something like:
> {code:borderStyle=solid}
> *input: prevStateRdd, inputRdd*
> missingKeys     = inputRdd.filter( <not exists in prevStateRdd> )  
> foundKeys       = inputRdd.filter( <exists in prevStateRdd> )  
> newLookupData   = lookupKeysUsingJdbcDataFrameRead( missingKeys.collect() )
> newStateRdd     = newLookupData.union( foundKeys).union( prevStateRdd )
> *output: newStateRdd*
> {code}
> This would nicely leverage all the power and richness of Spark. The only 
> missing bit - and the reason why this approach does not work today (based on 
> my naive understanding of Spark - is that {{newStateRdd}} cannot be declared 
> to be the {{prevStateRdd}} of the next micro batch.
> If Spark had a way of declaring an RDD (or DStream to be the parent for the 
> next batch run), even complex (chained) state operations would be easy to 
> describe and would not require hand-written Java/Python/Scala updateFunctions.
> Thanks a lot for taking the time to read all of this!!!
> Any thoughts/pointers are much appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to