Hey,
I would like to propose a way to extend the standard Streaming Scala API
methods (map, flatmap, filter etc) with versions that take stateful
functions as lambdas. I think this would eliminate the awkwardness of
implementing RichFunctions in Scala and make statefulness more explicit:
*For example:*
def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
Option[S]))
This would be translated into RichMap and RichFlatMapFunctions that store
Option[S] as OperatorState for fault tolerance.
*Example rolling sum by key:*
val input: DataStream[Long] = ...
val sumByKey: DataStream[Long] =
input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
sum match {
case Some(s) => (next + s, Some(next + s))
case None => (next, Some(next))
})
What do you think?
Gyula