[
https://issues.apache.org/jira/browse/SPARK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-12097.
----------------------------------
Resolution: Incomplete
> How to do a cached, batched JDBC-lookup in Spark Streaming?
> -----------------------------------------------------------
>
> Key: SPARK-12097
> URL: https://issues.apache.org/jira/browse/SPARK-12097
> Project: Spark
> Issue Type: Brainstorming
> Components: DStreams
> Reporter: Christian Kurz
> Priority: Major
> Labels: bulk-closed
>
> h3. Use-case
> I need to enrich incoming Kafka data with data from a lookup table (or query)
> on a relational database. Lookup data is changing slowly over time (So
> caching is okay for a certain retention time). Lookup data is potentially
> huge (So loading all data upfront is not option).
> h3. Problem
> The overall design idea is to implement a cached and batched JDBC lookup.
> That is, for any lookup keys, which are missing from the lookup cache, a JDBC
> lookup is done to retrieve the missing lookup data. JDBC lookups are rather
> expensive (connection overhead, number of round-trips) and therefore must be
> done in batches. E.g. one JDBC lookup per 100 missing keys.
> So the high-level logic might look something like this:
> # For every Kafka RDD we extract all lookup keys
> # For all lookup keys we check whether the lookup data is already available
> already in cache and whether this cached information has not expired, yet.
> # For any lookup keys not found in cache (or expired), we send batched
> prepared JDBC Statements to the database to fetch the missing lookup data:
> {{SELECT c1, c2, c3 FROM ... WHERE k1 in (?,?,?,...)}}
> to minimize the number of JDBC round-trips.
> # At this point we have up-to-date lookup data for all lookup keys and can
> perform the actual lookup operation.
> Does this approach make sense on Spark? Would Spark State DStreams be the
> right way to go? Or other design approaches?
> Assuming Spark State DStreams are the right direction, the low-level question
> is how to do the batching?
> Would this particular signature of DStream.updateStateByKey ( iterator->
> iterator):
> {code:borderStyle=solid}
> def updateStateByKey[S: ClassTag](
> updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
> partitioner: Partitioner,
> rememberPartitioner: Boolean,
> initialRDD: RDD[(K, S)]
> )
> {code}
> be the right way to batch multiple incoming keys into a single JDBC-lookup
> query?
> Would the new {{DStream.trackStateByKey()}} be a better approach?
> The second more high-level question: is there a way to chain multiple state
> operations on the same state object?
> Going with the above design approach the entire lookup logic would be
> handcrafted into some java/scala/python {{updateFunc}}. This function would
> go over all incoming keys, check which ones are missing from cache, batch the
> missing ones, run the JDBC queries and union the returned lookup data with
> the existing cache from the State object.
> The fact that all of this must be handcrafted into a single function seems to
> be caused by the fact that Spark State processing logic on a high-level works
> like this:
> {code:borderStyle=solid}
> input: prevStateRdd, inputRDD
> output: updateStateFunc( prevStateRdd, inputRdd )}}
> {code}
> So only a single updateStateFunc operating on prevStateRdd and inputRdd in
> one go. Once done there is no way to further refine the State as part of the
> current micro batch.
> The multi-step processing required here sounds like a typical use-case for a
> DStream: apply multiple operations one after the other on some incoming data.
> So I wonder whether there is a way to extend the concept of state processing
> (may be it already has been extended?) to do something like:
> {code:borderStyle=solid}
> *input: prevStateRdd, inputRdd*
> missingKeys = inputRdd.filter( <not exists in prevStateRdd> )
> foundKeys = inputRdd.filter( <exists in prevStateRdd> )
> newLookupData = lookupKeysUsingJdbcDataFrameRead( missingKeys.collect() )
> newStateRdd = newLookupData.union( foundKeys).union( prevStateRdd )
> *output: newStateRdd*
> {code}
> This would nicely leverage all the power and richness of Spark. The only
> missing bit - and the reason why this approach does not work today (based on
> my naive understanding of Spark - is that {{newStateRdd}} cannot be declared
> to be the {{prevStateRdd}} of the next micro batch.
> If Spark had a way of declaring an RDD (or DStream to be the parent for the
> next batch run), even complex (chained) state operations would be easy to
> describe and would not require hand-written Java/Python/Scala updateFunctions.
> Thanks a lot for taking the time to read all of this!!!
> Any thoughts/pointers are much appreciated.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]