Probably something like this.
dataset
.filter { userData =>
val dateThreshold = lookupThreshold(record) // look up the
threshold date based on the record details
userData.date > dateThreshold // compare
}
.groupBy()
.count()
This would probably give you what you want. However, note the following.
- The lookupThreshold() is executed at the executors for every row, can be
very slow if not cached properly etc.
- Since its dynamic value, its not guaranteed to be exactly same if there
is a reprocessing. That effects the fault-tolerance guarantees.
TD
On Wed, Feb 8, 2017 at 10:10 PM, Timothy Chan <[email protected]> wrote:
> I would like to count running totals for events coming in since a given
> date for a given user. How would I go about doing this?
>
> For example, we have user data coming in, we'd like to score that data,
> then keep running totals on that score, since a given date. Specifically, I
> always want to score the data, but I only want to keep a running total if
> the date is after a certain date (this would probably have to be looked up
> each time data is scored).
>