Christian Kurz created SPARK-12097:
--------------------------------------

             Summary: How to do a cached, batched JDBC-lookup in Spark 
Streaming?
                 Key: SPARK-12097
                 URL: https://issues.apache.org/jira/browse/SPARK-12097
             Project: Spark
          Issue Type: Brainstorming
          Components: Streaming
            Reporter: Christian Kurz


h3. Use-case
I need to enrich incoming Kafka data with data from a lookup table (or query) 
on a relational database. Lookup data is changing slowly over time (So caching 
is okay for a certain retention time). Lookup data is potentially huge (So 
loading all data upfront is not option).

h3. Problem
The overall design idea is to implement a cached and batched JDBC lookup. That 
is, for any lookup keys, which are missing from the lookup cache, a JDBC lookup 
is done to retrieve the missing lookup data. JDBC lookups are rather expensive 
(connection overhead, number of round-trips) and therefore must be done in 
batches. E.g. one JDBC lookup per 100 missing keys.

So the high-level logic might look something like this:

# For every Kafka RDD we extract all lookup keys
# For all lookup keys we check whether the lookup data is already available 
already in cache and whether this cached information has not expired, yet.
# For any lookup keys not found in cache (or expired), we send batched prepared 
JDBC Statements to the database to fetch the missing lookup data:
    {{SELECT c1, c2, c3 FROM ... WHERE k1 in (?,?,?,...)}}
to minimize the number of JDBC round-trips.
# At this point we have up-to-date lookup data for all lookup keys and can 
perform the actual lookup operation.

Does this approach make sense on Spark? Would Spark State DStreams be the right 
way to go? Or other design approaches?

Assuming Spark State DStreams are the right direction, the low-level question 
is how to do the batching?

Would this particular signature of DStream.updateStateByKey ( iterator-> 
iterator):

{code:borderStyle=solid}
def updateStateByKey[S: ClassTag](
      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
      partitioner: Partitioner,
      rememberPartitioner: Boolean,
      initialRDD: RDD[(K, S)]
    )
{code}

be the right way to batch multiple incoming keys into a single JDBC-lookup 
query?

Would the new {{DStream.trackStateByKey()}} be a better approach?


The second more high-level question: is there a way to chain multiple state 
operations on the same state object?

Going with the above design approach the entire lookup logic would be 
handcrafted into some java/scala/python {{updateFunc}}. This function would go 
over all incoming keys, check which ones are missing from cache, batch the 
missing ones, run the JDBC queries and union the returned lookup data with the 
existing cache from the State object.

The fact that all of this must be handcrafted into a single function seems to 
be caused by the fact that Spark State processing logic on a high-level works 
like this:

{code:borderStyle=solid}
input: prevStateRdd, inputRDD
output: updateStateFunc( prevStateRdd, inputRdd )}}
{code}

So only a single updateStateFunc operating on prevStateRdd and inputRdd in one 
go. Once done there is no way to further refine the State as part of the 
current micro batch.

The multi-step processing required here sounds like a typical use-case for a 
DStream: apply multiple operations one after the other on some incoming data. 
So I wonder whether there is a way to extend the concept of state processing 
(may be it already has been extended?) to do something like:

{code:borderStyle=solid}
*input: prevStateRdd, inputRdd*
missingKeys     = inputRdd.filter( <not exists in prevStateRdd> )  
foundKeys       = inputRdd.filter( <exists in prevStateRdd> )  
newLookupData   = lookupKeysUsingJdbcDataFrameRead( missingKeys.collect() )
newStateRdd     = newLookupData.union( foundKeys).union( prevStateRdd )
*output: newStateRdd*
{code}

This would nicely leverage all the power and richness of Spark. The only 
missing bit - and the reason why this approach does not work today (based on my 
naive understanding of Spark - is that {{newStateRdd}} cannot be declared to be 
the {{prevStateRdd}} of the next micro batch.

If Spark had a way of declaring an RDD (or DStream to be the parent for the 
next batch run), even complex (chained) state operations would be easy to 
describe and would not require hand-written Java/Python/Scala updateFunctions.


Thanks a lot for taking the time to read all of this!!!

Any thoughts/pointers are much appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to