True, flatMap does not have access to watermarks. You can also go a bit more to the low levels and directly implement an AbstractStreamOperator with OneInputStreamOperatorInterface. This is kind of the base class for the built-in stream operators and it has access to Watermarks (OneInputStreamOperator.processWatermark()).
Maybe the easiest is to simply extend StreamFlatMap and override the processWatermark() method. Cheers, Fabian 2016-04-28 14:40 GMT+02:00 Konstantin Kulagin <kkula...@gmail.com>: > Thanks Fabian, > > works like a charm except the case when the stream is finite (or i have a > dataset from the beginning). > > In this case I need somehow identify that stream is finished and emit > latest batch (which might have less amount of elements) to output. > What is the best way to do that? In streams and windows we have support > for watermarks, but I do not see similar stuff for a flatMap operation? > > In the sample below I need to emit values from 30 to 32 as well: > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Tuple2<Long, String>> source = > env.fromCollection(LongStream.range(0, 33).mapToObj(l -> > Tuple2.of(l, "This is " + l)).collect(Collectors.toList())); > > source.flatMap(new RichFlatMapFunction<Tuple2<Long, String>, Tuple2<Long, > String>>() { > List<Tuple2<Long, String>> cache = new ArrayList<>(); > > @Override > public RuntimeContext getRuntimeContext() { > return super.getRuntimeContext(); > } > > @Override > public void flatMap(Tuple2<Long, String> value, Collector<Tuple2<Long, > String>> out) throws Exception { > cache.add(value); > if (cache.size() == 5) { > System.out.println("!!!!! " + Thread.currentThread().getId() + ": " > + Joiner.on(",").join(cache)); > cache.stream().forEach(out::collect); > cache.clear(); > } > } > }).setParallelism(2).print(); > > env.execute("yoyoyo"); > } > > > Output (flink realted stuff excluded): > > !!!!! 35: (1,This is 1),(3,This is 3),(5,This is 5),(7,This is 7),(9,This > is 9) > !!!!! 36: (0,This is 0),(2,This is 2),(4,This is 4),(6,This is 6),(8,This > is 8) > !!!!! 35: (11,This is 11),(13,This is 13),(15,This is 15),(17,This is > 17),(19,This is 19) > !!!!! 36: (10,This is 10),(12,This is 12),(14,This is 14),(16,This is > 16),(18,This is 18) > !!!!! 35: (21,This is 21),(23,This is 23),(25,This is 25),(27,This is > 27),(29,This is 29) > !!!!! 36: (20,This is 20),(22,This is 22),(24,This is 24),(26,This is > 26),(28,This is 28) > > > And if you can give a bit more info on why will I have latency issues in a > case of varying rate of arrival elements that would be perfect. Or point me > to a direction where I can read it. > > Thanks! > Konstantin. > > On Thu, Apr 28, 2016 at 7:26 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Konstantin, >> >> if you do not need a deterministic grouping of elements you should not >> use a keyed stream or window. >> Instead you can do the lookups in a parallel flatMap function. The >> function would collect arriving elements and perform a lookup query after a >> certain number of elements arrived (can cause high latency if the arrival >> rate of elements is low or varies). >> The flatmap function can be executed in parallel and does not require a >> keyed stream. >> >> Best, Fabian >> >> >> 2016-04-25 18:58 GMT+02:00 Konstantin Kulagin <kkula...@gmail.com>: >> >>> As usual - thanks for answers, Aljoscha! >>> >>> I think I understood what I want to know. >>> >>> 1) To add few comments: about streams I was thinking about something >>> similar to Storm where you can have one Source and 'duplicate' the same >>> entry going through different 'path's. >>> Something like this: >>> https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_storm-user-guide/content/figures/1/figures/SpoutsAndBolts.png >>> And later you can 'join' these separate streams back. >>> And actually I think this is what I meant: >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/JoinedStreams.html >>> - this one actually 'joins' by window. >>> >>> As for 'exact-once-guarantee' I've got the difference from this paper: >>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink >>> - Thanks! >>> >>> 2) understood, thank you very much >>> >>> >>> >>> >>> >>> >>> I'll probably bother you one more time with another question: >>> >>> 3) Lets say I have a Source which provides raw (i.e. non-keyed) data. >>> And lets say I need to 'enhance' each entry with some fields which I can >>> take from a database. >>> So I define some DbEnhanceOperation >>> >>> Database query might be expensive - so I would want to >>> a) batch entries to perform queries >>> b) be able to have several parallel DbEnhaceOperations so those will not >>> slow down my whole processing. >>> >>> >>> I do not see a way to do that? >>> >>> >>> Problems: >>> >>> I cannot go with countWindowAll because of b) - that thing does not >>> support several streams (correct?) >>> >>> So I need to create a windowed stream and for that I need to have some >>> key - Correct? I.e cannot create windows on a stream of general object just >>> using number of objects. >>> >>> I probably can 'emulate' keyed stream by providing some 'fake' key. But >>> in this case I can parallelize only on different keys. Again - it is >>> probably doable by introducing some AtomicLong key generator at the first >>> place ( this part probably hard to understand - I can share details if >>> necessary) but still looks like a bit of hack :) >>> >>> But the general question - if I can implement 3) 'normally' in a >>> flink-way? >>> >>> Thanks! >>> Konstantin. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Mon, Apr 25, 2016 at 10:53 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> I'll try and answer your questions separately. First, a general remark, >>>> although Flink has the DataSet API for batch processing and the DataStream >>>> API for stream processing we only have one underlying streaming execution >>>> engine that is used for both. Now, regarding the questions: >>>> >>>> 1) What do you mean by "parallel into 2 streams"? Maybe that could >>>> influence my answer but I'll just give a general answer: Flink does not >>>> give any guarantees about the ordering of elements in a Stream or in a >>>> DataSet. This means that merging or unioning two streams/data sets will >>>> just mean that operations see all elements in the two merged streams but >>>> the order in which we see them is arbitrary. This means that we don't keep >>>> buffers based on time or size or anything. >>>> >>>> 2) The elements that flow through the topology are not tracked >>>> individually, each operation just receives elements, updates state and >>>> sends elements to downstream operation. In essence this means that elements >>>> themselves don't block any resources except if they alter some kept state >>>> in operations. If you have a stateless pipeline that only has >>>> filters/maps/flatMaps then the amount of required resources is very low. >>>> >>>> For a finite data set, elements are also streamed through the topology. >>>> Only if you use operations that require grouping or sorting (such as >>>> groupBy/reduce and join) will elements be buffered in memory or on disk >>>> before they are processed. >>>> >>>> Two answer your last question. If you only do stateless >>>> transformations/filters then you are fine to use either API and the >>>> performance should be similar. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Sun, 24 Apr 2016 at 15:54 Konstantin Kulagin <kkula...@gmail.com> >>>> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> I have some kind of general question in order to get more >>>>> understanding of stream vs final data transformation. More specific - I am >>>>> trying to understand 'entities' lifecycle during processing. >>>>> >>>>> 1) For example in a case of streams: suppose we start with some >>>>> key-value source, parallel it into 2 streams by key. Each stream modifies >>>>> entry's values, lets say adds some fields. And we want to merge it back >>>>> later. How does it happen? >>>>> Merging point will keep some finite buffer of entries? Basing on time >>>>> or size? >>>>> >>>>> I understand that probably right solution in this case would be having >>>>> one stream and achieve more more performance by increasing parallelism, >>>>> but >>>>> what if I have 2 sources from the beginning? >>>>> >>>>> >>>>> 2) Also I assume that in a case of streaming each entry considered as >>>>> 'processed' once it passes whole chain and emitted into some sink, so >>>>> after >>>>> it will not consume resources. Basically similar to what Storm is doing. >>>>> But in a case of finite data (data sets): how big amount of data >>>>> system will keep in memory? The whole set? >>>>> >>>>> I probably have some example of dataset vs stream 'mix': I need to >>>>> *transform* big but finite chunk of data, I don't really need to do any >>>>> 'joins', grouping or smth like that so I never need to store whole dataset >>>>> in memory/storage. What my choice would be in this case? >>>>> >>>>> Thanks! >>>>> Konstantin >>>>> >>>>> >>>>> >>> >> >