Probably something like this. dataset .filter { userData => val dateThreshold = lookupThreshold(record) // look up the threshold date based on the record details userData.date > dateThreshold // compare } .groupBy() .count()
This would probably give you what you want. However, note the following. - The lookupThreshold() is executed at the executors for every row, can be very slow if not cached properly etc. - Since its dynamic value, its not guaranteed to be exactly same if there is a reprocessing. That effects the fault-tolerance guarantees. TD On Wed, Feb 8, 2017 at 10:10 PM, Timothy Chan <tc...@lumoslabs.com> wrote: > I would like to count running totals for events coming in since a given > date for a given user. How would I go about doing this? > > For example, we have user data coming in, we'd like to score that data, > then keep running totals on that score, since a given date. Specifically, I > always want to score the data, but I only want to keep a running total if > the date is after a certain date (this would probably have to be looked up > each time data is scored). >