Hi, I vote on this issue and I agree this would be nice to have. Thx! Christophe
2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > I'm afraid this is currently a shortcoming in the API. There is this open > Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. > We can't fix it before Flink 2.0, though, because we have to keep the API > stable on the Flink 1.x release line. > > Cheers, > Aljoscha > > On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck < > christophe.salperw...@gmail.com> wrote: > >> Thanks for the feedback and sorry that I can't try all this straight away. >> >> Is there a way to have a different function than: >> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>() >> >> I would like to return a HBase Put and not a SummaryStatistics. So >> something like this: >> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() >> >> Christophe >> >> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>> OK, this indicates that the operator following the source is a >>> bottleneck. >>> >>> If that's the WindowOperator, it makes sense to try the refactoring of >>> the WindowFunction. >>> Alternatively, you can try to run that operator with a higher >>> parallelism. >>> >>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < >>> christophe.salperw...@gmail.com>: >>> >>>> Hi Fabian, >>>> >>>> Thanks for the help, I will try that. The backpressure was on the >>>> source (HBase). >>>> >>>> Christophe >>>> >>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>>> Hi Christophe, >>>>> >>>>> where does the backpressure appear? In front of the sink operator or >>>>> before the window operator? >>>>> >>>>> In any case, I think you can improve your WindowFunction if you >>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>. >>>>> The FoldFunction would take care of the statistics computation and the >>>>> WindowFunction would only assemble the result record including extracting >>>>> the start time of the window. >>>>> >>>>> Then you could do: >>>>> >>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>>>> YourWindowFunction()); >>>>> >>>>> This is more efficient because the FoldFunction is eagerly applied >>>>> when ever a new element is added to a window. Hence, the window does only >>>>> hold a single value (SummaryStatistics) instead of all element added to >>>>> the >>>>> window. In contrast the WindowFunction is called when the window is >>>>> finally >>>>> evaluated. >>>>> >>>>> Hope this helps, >>>>> Fabian >>>>> >>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < >>>>> christophe.salperw...@gmail.com>: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am writing a program to read timeseries from HBase and do some >>>>>> daily aggregations (Flink streaming). For now I am just computing some >>>>>> average so not very consuming but my HBase read get slower and slower (I >>>>>> have few billions of points to read). The back pressure is almost all the >>>>>> time close to 1. >>>>>> >>>>>> I use custom timestamp: >>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>>> >>>>>> so I implemented a custom extractor based on: >>>>>> AscendingTimestampExtractor >>>>>> >>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>>>>> read/s then it get worse and worse. Even when I cancel the job, data are >>>>>> still being written in HBase (I did a sink similar to the example - with >>>>>> a >>>>>> cache of 100s of HBase Puts to be a bit more efficient). >>>>>> >>>>>> When I don't put a sink it seems to stay on 1M reads/s. >>>>>> >>>>>> Do you have an idea why ? >>>>>> >>>>>> Here is a bit of code if needed: >>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >>>>>> .keyBy(0) >>>>>> .timeWindow(Time.days(1)); >>>>>> >>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>>>>> >>>>>> @Override >>>>>> public void apply(final Tuple key, final TimeWindow window, final >>>>>> Iterable<ANA> input, >>>>>> final Collector<Put> out) throws Exception { >>>>>> >>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>>>>> for (final ANA ana : input) { >>>>>> summaryStatistics.addValue(ana.getValue()); >>>>>> } >>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>>>>> summaryStatistics); >>>>>> out.collect(put); >>>>>> } >>>>>> }); >>>>>> >>>>>> And how I started Flink on YARN : >>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>>>>> -Dtaskmanager.network.numberOfBuffers=4096 >>>>>> >>>>>> Thanks for any feedback! >>>>>> >>>>>> Christophe >>>>>> >>>>> >>>>> >>>> >>> >>