Hi, I'm afraid this is currently a shortcoming in the API. There is this open Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We can't fix it before Flink 2.0, though, because we have to keep the API stable on the Flink 1.x release line.
Cheers, Aljoscha On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck < christophe.salperw...@gmail.com> wrote: > Thanks for the feedback and sorry that I can't try all this straight away. > > Is there a way to have a different function than: > WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>() > > I would like to return a HBase Put and not a SummaryStatistics. So > something like this: > WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() > > Christophe > > 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> OK, this indicates that the operator following the source is a >> bottleneck. >> >> If that's the WindowOperator, it makes sense to try the refactoring of >> the WindowFunction. >> Alternatively, you can try to run that operator with a higher parallelism. >> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < >> christophe.salperw...@gmail.com>: >> >>> Hi Fabian, >>> >>> Thanks for the help, I will try that. The backpressure was on the source >>> (HBase). >>> >>> Christophe >>> >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>> Hi Christophe, >>>> >>>> where does the backpressure appear? In front of the sink operator or >>>> before the window operator? >>>> >>>> In any case, I think you can improve your WindowFunction if you convert >>>> parts of it into a FoldFunction<ANA, SummaryStatistics>. >>>> The FoldFunction would take care of the statistics computation and the >>>> WindowFunction would only assemble the result record including extracting >>>> the start time of the window. >>>> >>>> Then you could do: >>>> >>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>>> YourWindowFunction()); >>>> >>>> This is more efficient because the FoldFunction is eagerly applied when >>>> ever a new element is added to a window. Hence, the window does only hold a >>>> single value (SummaryStatistics) instead of all element added to the >>>> window. In contrast the WindowFunction is called when the window is finally >>>> evaluated. >>>> >>>> Hope this helps, >>>> Fabian >>>> >>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < >>>> christophe.salperw...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> I am writing a program to read timeseries from HBase and do some daily >>>>> aggregations (Flink streaming). For now I am just computing some average >>>>> so >>>>> not very consuming but my HBase read get slower and slower (I have few >>>>> billions of points to read). The back pressure is almost all the time >>>>> close >>>>> to 1. >>>>> >>>>> I use custom timestamp: >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> >>>>> so I implemented a custom extractor based on: >>>>> AscendingTimestampExtractor >>>>> >>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>>>> read/s then it get worse and worse. Even when I cancel the job, data are >>>>> still being written in HBase (I did a sink similar to the example - with a >>>>> cache of 100s of HBase Puts to be a bit more efficient). >>>>> >>>>> When I don't put a sink it seems to stay on 1M reads/s. >>>>> >>>>> Do you have an idea why ? >>>>> >>>>> Here is a bit of code if needed: >>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >>>>> .keyBy(0) >>>>> .timeWindow(Time.days(1)); >>>>> >>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>>>> >>>>> @Override >>>>> public void apply(final Tuple key, final TimeWindow window, final >>>>> Iterable<ANA> input, >>>>> final Collector<Put> out) throws Exception { >>>>> >>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>>>> for (final ANA ana : input) { >>>>> summaryStatistics.addValue(ana.getValue()); >>>>> } >>>>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>>>> summaryStatistics); >>>>> out.collect(put); >>>>> } >>>>> }); >>>>> >>>>> And how I started Flink on YARN : >>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>>>> -Dtaskmanager.network.numberOfBuffers=4096 >>>>> >>>>> Thanks for any feedback! >>>>> >>>>> Christophe >>>>> >>>> >>>> >>> >> >