Probably something like this.

dataset
   .filter { userData =>
       val dateThreshold = lookupThreshold(record)    // look up the
threshold date based on the record details
       userData.date > dateThreshold                          // compare
   }
   .groupBy()
   .count()

This would probably give you what you want. However, note the following.
- The lookupThreshold() is executed at the executors for every row, can be
very slow if not cached properly etc.
- Since its dynamic value, its not guaranteed to be exactly same if there
is a reprocessing. That effects the fault-tolerance guarantees.

TD


On Wed, Feb 8, 2017 at 10:10 PM, Timothy Chan <tc...@lumoslabs.com> wrote:

> I would like to count running totals for events coming in since a given
> date for a given user. How would I go about doing this?
>
> For example, we have user data coming in, we'd like to score that data,
> then keep running totals on that score, since a given date. Specifically, I
> always want to score the data, but I only want to keep a running total if
> the date is after a certain date (this would probably have to be looked up
> each time data is scored).
>

Reply via email to