Re: termination of stream#iterate on finite streams

2017-09-03 Thread Peter Ertl
link/streaming/runtime/tasks/StreamIterationHead.java#L80>). > > Hope everything is considered this time : ) > > Best, > Xingcan > > On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <mailto:peter.e...@gmx.net>> wrote: > >> Am 02.09.2017 um 04:45 schrieb Xingcan Cui &g

Re: termination of stream#iterate on finite streams

2017-09-02 Thread Peter Ertl
> Am 02.09.2017 um 04:45 schrieb Xingcan Cui : > > In your codes, all the the long values will subtract 1 and be sent back to > the iterate operator, endlessly. Is this true? shouldn't val iterationResult2 = env.generateSequence(1, 4).iterate(it => { (it.filter(_ > 0).map(_ - 1), it.filt

termination of stream#iterate on finite streams

2017-09-01 Thread Peter Ertl
Hi folks, I was doing some experiments with DataStream#iterate and what felt strange to me is the fact that #iterate() does not terminate on it's own when consuming a _finite_ stream. I think this is awkward und unexpected. Only thing that "helped" was setting an arbitrary and meaningless time

load + update global state

2017-08-07 Thread Peter Ertl
Hi folks, I am coding a streaming task that processes http requests from our web site and enriches these with additional information. It contains session ids from historic requests and the related emails that were used within these session in the past. lookup - hashtable: session_id:

json mapper

2017-08-03 Thread Peter Ertl
Hi flink users, I just wanted to ask if this kind of scala map function is correct? object JsonMapper { private val mapper: ObjectMapper = new ObjectMapper() } class JsonMapper extends MapFunction[String, ObjectNode] { override def map(value: String): ObjectNode = JsonMapper.mapper.readValu

state inside functions

2017-08-03 Thread Peter Ertl
Hi, can someone elaborate on when I should set properties transient / non-transient within operators (e.g. map / flatMap / reduce) ? I see these two possibilies: (1) initialize a non-transient property from the constructor (2) initialize a transient property inside a Rich???Function when open(

replacement for KeyedStream.fold(..) ?

2017-08-02 Thread Peter Ertl
Hi folks, since KeyedStream.fold(..) is marked as @deprecated what is the proper replacement for that kind of functionality? Is mapWithState() and flatMapWithState() a *full* replacement? Cheers Peter

multiple streams with multiple actions - proper way?

2017-07-29 Thread Peter Ertl
Hello Flink People :-) I am trying to get my head around flink - is it a supported use case to register multiple streams with possibly more than one transformation / action per stream? def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment v