Hey,

I would like to propose a way to extend the standard Streaming Scala API
methods (map, flatmap, filter etc) with versions that take stateful
functions as lambdas. I think this would eliminate the awkwardness of
implementing RichFunctions in Scala and make statefulness more explicit:

*For example:*
def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
Option[S]))

This would be translated into RichMap and RichFlatMapFunctions that store
Option[S] as OperatorState for fault tolerance.

*Example rolling sum by key:*
val input: DataStream[Long] = ...
val sumByKey: DataStream[Long] =
    input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
         sum match {
                   case Some(s) => (next + s, Some(next + s))
                   case None => (next, Some(next))
          })

What do you think?

Gyula

Reply via email to