Re: termination of stream#iterate on finite streams
Hi Xingcan! if a _finite_ stream would, at the end, emit a special, trailing "End-Of-Stream Message" that floats downward the operator stream, wouldn't this enable us to deterministically end the iteration without needing a timeout? Having an arbitrary timeout that must be longer than any iteration step takes seems really awkward. What you think? Best regards Peter > Am 02.09.2017 um 17:16 schrieb Xingcan Cui <mailto:xingc...@gmail.com>>: > > Hi Peter, > > I just omitted the filter part. Sorry for that. > > Actually, as the javadoc explained, by default a DataStream with iteration > will never terminate. That's because in a > stream environment with iteration, the operator will never know whether the > feedback stream has reached its end > (though the data source is terminated, there may be unknowable subsequent > data) and that's why it needs a > timeout value to make the judgement, just like many other function calls in > network connection. In other words, > you know the feedback stream will be empty in the future, but the operator > doesn't. Thus we provide it a maximum > waiting time for the next record. > > Internally, this mechanism is implemented via a blocking queue (the related > code can be found here > <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>). > > Hope everything is considered this time : ) > > Best, > Xingcan > > On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <mailto:peter.e...@gmx.net>> wrote: > >> Am 02.09.2017 um 04:45 schrieb Xingcan Cui > <mailto:xingc...@gmail.com>>: >> >> In your codes, all the the long values will subtract 1 and be sent back to >> the iterate operator, endlessly. > > > Is this true? shouldn't > val iterationResult2 = env.generateSequence(1, 4).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump > meaningless 'y' chars just to do anything > }) > iterationResult2.print() > > produce the following _feedback_ streams? > > initial input to #iterate(): [1 2 3 4] > > iteration #1 : [1 2 3] > iteration #2 : [1 2] > iteration #3 : [1] > iteration #4 : [] => empty feedback stream => cause termination? (which > actually only happens when setting a timeout value) > > Best regards > Peter > Am 02.09.2017 um 17:16 schrieb Xingcan Cui : > > Hi Peter, > > I just omitted the filter part. Sorry for that. > > Actually, as the javadoc explained, by default a DataStream with iteration > will never terminate. That's because in a > stream environment with iteration, the operator will never know whether the > feedback stream has reached its end > (though the data source is terminated, there may be unknowable subsequent > data) and that's why it needs a > timeout value to make the judgement, just like many other function calls in > network connection. In other words, > you know the feedback stream will be empty in the future, but the operator > doesn't. Thus we provide it a maximum > waiting time for the next record. > > Internally, this mechanism is implemented via a blocking queue (the related > code can be found here > <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>). > > Hope everything is considered this time : ) > > Best, > Xingcan > > On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <mailto:peter.e...@gmx.net>> wrote: > >> Am 02.09.2017 um 04:45 schrieb Xingcan Cui > <mailto:xingc...@gmail.com>>: >> >> In your codes, all the the long values will subtract 1 and be sent back to >> the iterate operator, endlessly. > > > Is this true? shouldn't > val iterationResult2 = env.generateSequence(1, 4).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump > meaningless 'y' chars just to do anything > }) > iterationResult2.print() > > produce the following _feedback_ streams? > > initial input to #iterate(): [1 2 3 4] > > iteration #1 : [1 2 3] > iteration #2 : [1 2] > iteration #3 : [1] > iteration #4 : [] => empty feedback stream => cause termination? (which > actually only happens when setting a timeout value) > > Best regards > Peter > > >
Re: termination of stream#iterate on finite streams
> Am 02.09.2017 um 04:45 schrieb Xingcan Cui : > > In your codes, all the the long values will subtract 1 and be sent back to > the iterate operator, endlessly. Is this true? shouldn't val iterationResult2 = env.generateSequence(1, 4).iterate(it => { (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything }) iterationResult2.print() produce the following _feedback_ streams? initial input to #iterate(): [1 2 3 4] iteration #1 : [1 2 3] iteration #2 : [1 2] iteration #3 : [1] iteration #4 : [] => empty feedback stream => cause termination? (which actually only happens when setting a timeout value) Best regards Peter
termination of stream#iterate on finite streams
Hi folks, I was doing some experiments with DataStream#iterate and what felt strange to me is the fact that #iterate() does not terminate on it's own when consuming a _finite_ stream. I think this is awkward und unexpected. Only thing that "helped" was setting an arbitrary and meaningless timeout on iterate. Imho this should not be necessary (maybe sent an internal "poison message" downward the iteration stream to signal shutdown of the streaming task?) example: // --- // does terminate by introducing a meaningless timeout // --- val iterationResult1 = env.generateSequence(1, 4).iterate(it => { (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump meaningless 'x' chars just to do anything }, 1000, keepPartitioning = false) iterationResult1.print() // --- // does NEVER terminate // --- val iterationResult2 = env.generateSequence(1, 4).iterate(it => { (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything }) iterationResult2.print() Can someone elaborate on this - should I file a ticket? Regards Peter
load + update global state
Hi folks, I am coding a streaming task that processes http requests from our web site and enriches these with additional information. It contains session ids from historic requests and the related emails that were used within these session in the past. lookup - hashtable: session_id: String => emails: Set[String] During processing of these NEW http request - the lookup table should be used to get previous emails and enrich the current stream item - new candidates for the lookup table will be discovered during processing of these items and should be added to the lookup table (also these changes should be visible through the cluster) I see at least the following issues: (1) load the state as a whole from the data store into memory is a huge burn of memory (also making changes cluster-wide visible is an issue) (2) not loading into memory but using something like cassandra / redis as a lookup store would certainly work but introduces a lot of network requests (possible ideas: use a distributed cache? broadcast updates in flink cluster?) (3) how should I integrate the changes to the table with flink's checkpointing? I really don't get how to solve this best and my current solution is far from elegant So is there any best practice for supporting "large lookup tables that change during stream processing" ? Cheers Peter
json mapper
Hi flink users, I just wanted to ask if this kind of scala map function is correct? object JsonMapper { private val mapper: ObjectMapper = new ObjectMapper() } class JsonMapper extends MapFunction[String, ObjectNode] { override def map(value: String): ObjectNode = JsonMapper.mapper.readValue(value, classOf[ObjectNode]) } Is using a static reference to ObjectMapper fine or will this cause issues on a distributed cluster / with checkpoint / serializing state / whatever ? Or should I instead use a non-transient property initialized in ctor (ObjectMapper is java.io.Serializable) ? Or should I initialize it with RichMapFunction.open into a transient property? Also I am wondering if replacing 'class' with 'object' (=> singleton) object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ } is ok (actually the mapper is stateless so no obvious need to re-instantiate it again and again ? ) Thanks and best regards Peter
state inside functions
Hi, can someone elaborate on when I should set properties transient / non-transient within operators (e.g. map / flatMap / reduce) ? I see these two possibilies: (1) initialize a non-transient property from the constructor (2) initialize a transient property inside a Rich???Function when open(ConfigurationParameters) is invoked on what criteria should I choose (1) or (2) ? how is this related to checkpointing / rebalancing? Thanks in advance Peter
replacement for KeyedStream.fold(..) ?
Hi folks, since KeyedStream.fold(..) is marked as @deprecated what is the proper replacement for that kind of functionality? Is mapWithState() and flatMapWithState() a *full* replacement? Cheers Peter
multiple streams with multiple actions - proper way?
Hello Flink People :-) I am trying to get my head around flink - is it a supported use case to register multiple streams with possibly more than one transformation / action per stream? def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val prop = new Properties() prop.setProperty("bootstrap.servers", "vmi:9092") // first stream val ins = env.addSource(new FlinkKafkaConsumer010("foo", new SimpleStringSchema(), prop)) .map(s => "transformation-1: " + s) ins.map(s => "transformation-2:" + s).print() // one action ins.map(s => "transformation-3:" + s).print() // one more action ins.map(s => "transformation-4:" + s).print() // another action on the same stream // second, different stream val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new SimpleStringSchema(), prop)) .map(s => "transformation-5: " + s) ins2.map(s => "transformation-7:" + s).print() // action ins2.map(s => "transformation-6:" + s).print() // different action env.execute("run all streams with multiple actions attached") } Is this program abusing flnk or is this just how you are supposed to do things? also, how many threads will this programm consume when running with parallelism = 4 ? Best regards Peter