GitHub user bijaybisht opened a pull request:

    https://github.com/apache/spark/pull/2633

    Event proration based on event timestamps.

    
    Topic: Spark Streaming using Event TimeStamps
    
    Spark streaming creates a batch (RDD) of events every T duration. The batch 
is based on a schedule and the timestamp associated with the batch is the time 
at which it was scheduled by Spark. Spark applied timestamp may be less 
relevant than the timestamp for which the event was originally meant to be.
    
    The fundamental reason for the event timestamp to differ from the spark 
stamp is the delay in event generation in the upstream system and delay in 
transporting the event to spark after the event generation. The problem is 
compounded in case of events having a start and an end with both the timestamps 
packed in a single event generated after the event ends as illustrated in the 
following diagram.
    
    (upstream) -------s--------e----g---------------->
    (spark   ) ------------------------r------------->
    
    Horizontal axis is time. Event starts at s ends at e and the event record 
is generated at g, which is then received by spark at r.
    
    So there is a need to create batches which only contain the relevant events 
or the relevant proportion of the events according to the original timestamp 
passed to Spark as a part of the received tuples.  Lets refer to a batch which 
has all the events occurring in the time window it represents as a bin. So a 
bin from T1 - T2 will 'only have events' which occurred in the period T1 - T2. 
The definition of the bin can be extended to include ‘all the events’ which 
occurred in a given period, however second constraint is harder to satisfy in 
practice, because events can be arbitrarily delayed. 
    
    For the rest of the discussion the definition of the batch and the bin 
shall be as per the previous paragraph.
    
    Bin sizes determine time series time granularity and is an independent 
consideration in itself i.e independent of the batch/event/delay.
    
    Lets say that batch size is T and the bin size is n*T and an event is 
delayed (for reception) at a maximum by d*T. So in order to generate a bin, n + 
d batches of size T are required.
    
    
    Conversely every batch is going to contribute to current up till the last 
ceiling((n + d)/ n) bins.
    
    For for batch @ t. The contents can be seen as T1 @ t (where the notation 
T1 @ t implies events corresponding to bin T1 from batch t), T1 - 1 @t, T1 - 2 
@t ... T1 - m @ t (where T1 - 1, represents the bin previous to T1 and m = 
ceiling(n + d)/ n))).
    
    We can then de-multiplex the contributions from batch @ t into bins T1, T1 
- 1, T1 - 2, T1 -3, resulting into streams which represent partial bins 
relative to the batch stream. So a stream i represents partial bin T1 - i 
received at t. This way the spark application can deliver incremental bins to 
the downstream in the most real time possible. Now depending on how the 
downstream application can handle the partial bins, the definition and the 
generation of the streams needs to be controlled.
    
    Cases:
    
    1. The downstream application can handle incremental updates to the bin 
(i.e. a partial bin = current partial bin + latest partial bin). For this what 
is required is m streams  which send the updates every T interval where
    
    Stream 1: T1 @ t
    Stream 2: T1 - 1 @ t
    …
    Stream m: T1 - m @ t.
    
    2. The downstream application can only handle full updates to the bin ( 
i.e. partial bin = latest partial bin). For this what is required is m streams  
which send the updates every T interval where
    
    
    Stream 1: T1         @ t
    Stream 2: T1 - 1    @ t +  @ t - 1
    ...
    Stream m: T1 - m @ t +  … +  @ t - m 
    
    i.e a bin is getting updated at every T until the bin is final. The first 
stream represents the most current bin with the latest cumulative update. The 
next stream represents the previous bin with the latest cumulative update and 
so on. Until the last stream which represents a final bin.
    
    3. The downstream application cannot handle updates to a bin. This is 
basically the last stream from case 2 (highlighted in bold) with the exception 
that it slides by nT and not T. Note that the next bin after T1 @ t is  T1 + 1 
@ t + n*T, because the size of the bin is n*T.
    
    Typically each stream needs to treated similarly because it represents that 
same kind of content, however there can be use cases where the stream may be 
required to be treated differently. A consideration for the API.
    
    Implementation:
    
    In order to transform a batch stream to a partial bin stream, we can filter 
the events and put the prorated events in a bin streams representing T @ t, T-1 
@ t and so on.
    
    For this we can define a new DStream which generates a DStream by prorating 
the data from batch to a bin corresponding to the stream. 
          
    For the use case 2 which requires progressively accumulating all the events 
for a bin. A new DStream is required which generates a pulsating window which 
goes from (s*n + 1) to (s*n + n) where s is the partial stream index. A stream 
index 0 implies that it is the most current partial bin stream.
    
    
    APIs
    ------
    
    BinStreamer[T](DStream[T], start: T=>Time, end: T=>Time)
    This will return a BinStreamer object.
    
    The BinStreamer object can be used to generate incremental bin streams 
(case 1)/ final bin (case 3) stream/ updated bin streams (case 2) using the 
following APIs.
    
    BinStreamer.incrementalStreams(sizeInNumBatches: Int, delayIndex: Int, 
numStreams: Int) : Seq[BinStream[(T,percentage)]] 
    
    BinStreamer.finalStream(sizeInNumBatches: Int, delayIndex: Int) : 
BinStream[(T,percentage)]
    
    BinStreamer.updatedStreams(sizeInNumBatches: Int, delayIndex: Int, 
numStreams: Int) : Seq[BinStream[(T,percentage)]]
    
                DStream[T] : This is the batch stream.
                start : Closure to get the start time from the record.
        end : Closure to get the end time from the record.
        sizeInNumBatches : The size of bin as a multiple of batch size.
        delayIndex : The maximum delay between the event relevance and event 
reception.
    numStreams: This is the number of bin streams. Even though it is fixed by 
batch size, bin size and the delayIndex. This is an  optional parameter to 
control the number of output Streams and it does so by delaying the most 
current bin.
    
    Each BinStream will wrap a DStream.
    
    
      def prorate(binStart: Time, binEnd: Time)(x: T) = {
    
          val sx = startFunc(x)
          val ex = endFunc(x)
    
          // Even though binStart is not inclusive, binStart here implies limit 
x as x approaches binStart+
          val s = if (sx > binStart) sx else binStart
    
          val e = if (ex < binEnd) ex else binEnd
    
          if (ex == sx) {
            (x, 1.0)
          }
          else {
            (x, (e - s) / (ex - sx))
          }
      }
    
      def filter(binStart: Time, binEnd: Time)(x: T) = {
    
        // The flow is starting in the subsequent bin
        if (startFunc(x) > binEnd) false
    
        // The flow ended in the prior bin
        else if (endFunc(x) <= binStart) false
    
        // start(x) approaches from binEnd+
        else if (startFunc(x) == binEnd && endFunc(x) > binEnd) false
    
        else true
    
      }


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bijaybisht/spark binning

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2633
    
----
commit 2002151f6075ef0e496ee49712f7aff0685ad32a
Author: Bijay Bisht <[email protected]>
Date:   2014-01-27T23:55:21Z

    Event proration based on event timestamps.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to