[ https://issues.apache.org/jira/browse/SPARK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036116#comment-15036116 ]
Sean Owen commented on SPARK-12097: ----------------------------------- Normally I'd say we don't use JIRA for discussion (i.e. we don't use the Brainstorming type and can't delete it), but since the writeup here is good, and I have some back-story on the use case here which makes me believe it _could_ lead to a change, let's leave it for the moment. Why not simply query once per batch for all the rows you need and cache them? this need not involve anything specific to Spark. {{updateStateByKey}} is more for when you want Spark to maintain some persistent data, but you are already maintaining it (in your RDBMS). It's unnecessarily complex here. > How to do a cached, batched JDBC-lookup in Spark Streaming? > ----------------------------------------------------------- > > Key: SPARK-12097 > URL: https://issues.apache.org/jira/browse/SPARK-12097 > Project: Spark > Issue Type: Brainstorming > Components: Streaming > Reporter: Christian Kurz > > h3. Use-case > I need to enrich incoming Kafka data with data from a lookup table (or query) > on a relational database. Lookup data is changing slowly over time (So > caching is okay for a certain retention time). Lookup data is potentially > huge (So loading all data upfront is not option). > h3. Problem > The overall design idea is to implement a cached and batched JDBC lookup. > That is, for any lookup keys, which are missing from the lookup cache, a JDBC > lookup is done to retrieve the missing lookup data. JDBC lookups are rather > expensive (connection overhead, number of round-trips) and therefore must be > done in batches. E.g. one JDBC lookup per 100 missing keys. > So the high-level logic might look something like this: > # For every Kafka RDD we extract all lookup keys > # For all lookup keys we check whether the lookup data is already available > already in cache and whether this cached information has not expired, yet. > # For any lookup keys not found in cache (or expired), we send batched > prepared JDBC Statements to the database to fetch the missing lookup data: > {{SELECT c1, c2, c3 FROM ... WHERE k1 in (?,?,?,...)}} > to minimize the number of JDBC round-trips. > # At this point we have up-to-date lookup data for all lookup keys and can > perform the actual lookup operation. > Does this approach make sense on Spark? Would Spark State DStreams be the > right way to go? Or other design approaches? > Assuming Spark State DStreams are the right direction, the low-level question > is how to do the batching? > Would this particular signature of DStream.updateStateByKey ( iterator-> > iterator): > {code:borderStyle=solid} > def updateStateByKey[S: ClassTag]( > updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], > partitioner: Partitioner, > rememberPartitioner: Boolean, > initialRDD: RDD[(K, S)] > ) > {code} > be the right way to batch multiple incoming keys into a single JDBC-lookup > query? > Would the new {{DStream.trackStateByKey()}} be a better approach? > The second more high-level question: is there a way to chain multiple state > operations on the same state object? > Going with the above design approach the entire lookup logic would be > handcrafted into some java/scala/python {{updateFunc}}. This function would > go over all incoming keys, check which ones are missing from cache, batch the > missing ones, run the JDBC queries and union the returned lookup data with > the existing cache from the State object. > The fact that all of this must be handcrafted into a single function seems to > be caused by the fact that Spark State processing logic on a high-level works > like this: > {code:borderStyle=solid} > input: prevStateRdd, inputRDD > output: updateStateFunc( prevStateRdd, inputRdd )}} > {code} > So only a single updateStateFunc operating on prevStateRdd and inputRdd in > one go. Once done there is no way to further refine the State as part of the > current micro batch. > The multi-step processing required here sounds like a typical use-case for a > DStream: apply multiple operations one after the other on some incoming data. > So I wonder whether there is a way to extend the concept of state processing > (may be it already has been extended?) to do something like: > {code:borderStyle=solid} > *input: prevStateRdd, inputRdd* > missingKeys = inputRdd.filter( <not exists in prevStateRdd> ) > foundKeys = inputRdd.filter( <exists in prevStateRdd> ) > newLookupData = lookupKeysUsingJdbcDataFrameRead( missingKeys.collect() ) > newStateRdd = newLookupData.union( foundKeys).union( prevStateRdd ) > *output: newStateRdd* > {code} > This would nicely leverage all the power and richness of Spark. The only > missing bit - and the reason why this approach does not work today (based on > my naive understanding of Spark - is that {{newStateRdd}} cannot be declared > to be the {{prevStateRdd}} of the next micro batch. > If Spark had a way of declaring an RDD (or DStream to be the parent for the > next batch run), even complex (chained) state operations would be easy to > describe and would not require hand-written Java/Python/Scala updateFunctions. > Thanks a lot for taking the time to read all of this!!! > Any thoughts/pointers are much appreciated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org