GitHub user bijaybisht opened a pull request:
https://github.com/apache/spark/pull/2633
Event proration based on event timestamps.
Topic: Spark Streaming using Event TimeStamps
Spark streaming creates a batch (RDD) of events every T duration. The batch
is based on a schedule and the timestamp associated with the batch is the time
at which it was scheduled by Spark. Spark applied timestamp may be less
relevant than the timestamp for which the event was originally meant to be.
The fundamental reason for the event timestamp to differ from the spark
stamp is the delay in event generation in the upstream system and delay in
transporting the event to spark after the event generation. The problem is
compounded in case of events having a start and an end with both the timestamps
packed in a single event generated after the event ends as illustrated in the
following diagram.
(upstream) -------s--------e----g---------------->
(spark ) ------------------------r------------->
Horizontal axis is time. Event starts at s ends at e and the event record
is generated at g, which is then received by spark at r.
So there is a need to create batches which only contain the relevant events
or the relevant proportion of the events according to the original timestamp
passed to Spark as a part of the received tuples. Lets refer to a batch which
has all the events occurring in the time window it represents as a bin. So a
bin from T1 - T2 will 'only have events' which occurred in the period T1 - T2.
The definition of the bin can be extended to include âall the eventsâ which
occurred in a given period, however second constraint is harder to satisfy in
practice, because events can be arbitrarily delayed.
For the rest of the discussion the definition of the batch and the bin
shall be as per the previous paragraph.
Bin sizes determine time series time granularity and is an independent
consideration in itself i.e independent of the batch/event/delay.
Lets say that batch size is T and the bin size is n*T and an event is
delayed (for reception) at a maximum by d*T. So in order to generate a bin, n +
d batches of size T are required.
Conversely every batch is going to contribute to current up till the last
ceiling((n + d)/ n) bins.
For for batch @ t. The contents can be seen as T1 @ t (where the notation
T1 @ t implies events corresponding to bin T1 from batch t), T1 - 1 @t, T1 - 2
@t ... T1 - m @ t (where T1 - 1, represents the bin previous to T1 and m =
ceiling(n + d)/ n))).
We can then de-multiplex the contributions from batch @ t into bins T1, T1
- 1, T1 - 2, T1 -3, resulting into streams which represent partial bins
relative to the batch stream. So a stream i represents partial bin T1 - i
received at t. This way the spark application can deliver incremental bins to
the downstream in the most real time possible. Now depending on how the
downstream application can handle the partial bins, the definition and the
generation of the streams needs to be controlled.
Cases:
1. The downstream application can handle incremental updates to the bin
(i.e. a partial bin = current partial bin + latest partial bin). For this what
is required is m streams which send the updates every T interval where
Stream 1: T1 @ t
Stream 2: T1 - 1 @ t
â¦
Stream m: T1 - m @ t.
2. The downstream application can only handle full updates to the bin (
i.e. partial bin = latest partial bin). For this what is required is m streams
which send the updates every T interval where
Stream 1: T1 @ t
Stream 2: T1 - 1 @ t + @ t - 1
...
Stream m: T1 - m @ t + ⦠+ @ t - m
i.e a bin is getting updated at every T until the bin is final. The first
stream represents the most current bin with the latest cumulative update. The
next stream represents the previous bin with the latest cumulative update and
so on. Until the last stream which represents a final bin.
3. The downstream application cannot handle updates to a bin. This is
basically the last stream from case 2 (highlighted in bold) with the exception
that it slides by nT and not T. Note that the next bin after T1 @ t is T1 + 1
@ t + n*T, because the size of the bin is n*T.
Typically each stream needs to treated similarly because it represents that
same kind of content, however there can be use cases where the stream may be
required to be treated differently. A consideration for the API.
Implementation:
In order to transform a batch stream to a partial bin stream, we can filter
the events and put the prorated events in a bin streams representing T @ t, T-1
@ t and so on.
For this we can define a new DStream which generates a DStream by prorating
the data from batch to a bin corresponding to the stream.
For the use case 2 which requires progressively accumulating all the events
for a bin. A new DStream is required which generates a pulsating window which
goes from (s*n + 1) to (s*n + n) where s is the partial stream index. A stream
index 0 implies that it is the most current partial bin stream.
APIs
------
BinStreamer[T](DStream[T], start: T=>Time, end: T=>Time)
This will return a BinStreamer object.
The BinStreamer object can be used to generate incremental bin streams
(case 1)/ final bin (case 3) stream/ updated bin streams (case 2) using the
following APIs.
BinStreamer.incrementalStreams(sizeInNumBatches: Int, delayIndex: Int,
numStreams: Int) : Seq[BinStream[(T,percentage)]]
BinStreamer.finalStream(sizeInNumBatches: Int, delayIndex: Int) :
BinStream[(T,percentage)]
BinStreamer.updatedStreams(sizeInNumBatches: Int, delayIndex: Int,
numStreams: Int) : Seq[BinStream[(T,percentage)]]
DStream[T] : This is the batch stream.
start : Closure to get the start time from the record.
end : Closure to get the end time from the record.
sizeInNumBatches : The size of bin as a multiple of batch size.
delayIndex : The maximum delay between the event relevance and event
reception.
numStreams: This is the number of bin streams. Even though it is fixed by
batch size, bin size and the delayIndex. This is an optional parameter to
control the number of output Streams and it does so by delaying the most
current bin.
Each BinStream will wrap a DStream.
def prorate(binStart: Time, binEnd: Time)(x: T) = {
val sx = startFunc(x)
val ex = endFunc(x)
// Even though binStart is not inclusive, binStart here implies limit
x as x approaches binStart+
val s = if (sx > binStart) sx else binStart
val e = if (ex < binEnd) ex else binEnd
if (ex == sx) {
(x, 1.0)
}
else {
(x, (e - s) / (ex - sx))
}
}
def filter(binStart: Time, binEnd: Time)(x: T) = {
// The flow is starting in the subsequent bin
if (startFunc(x) > binEnd) false
// The flow ended in the prior bin
else if (endFunc(x) <= binStart) false
// start(x) approaches from binEnd+
else if (startFunc(x) == binEnd && endFunc(x) > binEnd) false
else true
}
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/bijaybisht/spark binning
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/2633.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2633
----
commit 2002151f6075ef0e496ee49712f7aff0685ad32a
Author: Bijay Bisht <[email protected]>
Date: 2014-01-27T23:55:21Z
Event proration based on event timestamps.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]