Thanks for the feedback and sorry that I can't try all this straight away. Is there a way to have a different function than: WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
I would like to return a HBase Put and not a SummaryStatistics. So something like this: WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() Christophe 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > OK, this indicates that the operator following the source is a bottleneck. > > If that's the WindowOperator, it makes sense to try the refactoring of the > WindowFunction. > Alternatively, you can try to run that operator with a higher parallelism. > > 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < > christophe.salperw...@gmail.com>: > >> Hi Fabian, >> >> Thanks for the help, I will try that. The backpressure was on the source >> (HBase). >> >> Christophe >> >> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>> Hi Christophe, >>> >>> where does the backpressure appear? In front of the sink operator or >>> before the window operator? >>> >>> In any case, I think you can improve your WindowFunction if you convert >>> parts of it into a FoldFunction<ANA, SummaryStatistics>. >>> The FoldFunction would take care of the statistics computation and the >>> WindowFunction would only assemble the result record including extracting >>> the start time of the window. >>> >>> Then you could do: >>> >>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>> YourWindowFunction()); >>> >>> This is more efficient because the FoldFunction is eagerly applied when >>> ever a new element is added to a window. Hence, the window does only hold a >>> single value (SummaryStatistics) instead of all element added to the >>> window. In contrast the WindowFunction is called when the window is finally >>> evaluated. >>> >>> Hope this helps, >>> Fabian >>> >>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < >>> christophe.salperw...@gmail.com>: >>> >>>> Hi, >>>> >>>> I am writing a program to read timeseries from HBase and do some daily >>>> aggregations (Flink streaming). For now I am just computing some average so >>>> not very consuming but my HBase read get slower and slower (I have few >>>> billions of points to read). The back pressure is almost all the time close >>>> to 1. >>>> >>>> I use custom timestamp: >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> >>>> so I implemented a custom extractor based on: >>>> AscendingTimestampExtractor >>>> >>>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>>> read/s then it get worse and worse. Even when I cancel the job, data are >>>> still being written in HBase (I did a sink similar to the example - with a >>>> cache of 100s of HBase Puts to be a bit more efficient). >>>> >>>> When I don't put a sink it seems to stay on 1M reads/s. >>>> >>>> Do you have an idea why ? >>>> >>>> Here is a bit of code if needed: >>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >>>> .keyBy(0) >>>> .timeWindow(Time.days(1)); >>>> >>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>>> >>>> @Override >>>> public void apply(final Tuple key, final TimeWindow window, final >>>> Iterable<ANA> input, >>>> final Collector<Put> out) throws Exception { >>>> >>>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>>> for (final ANA ana : input) { >>>> summaryStatistics.addValue(ana.getValue()); >>>> } >>>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>>> summaryStatistics); >>>> out.collect(put); >>>> } >>>> }); >>>> >>>> And how I started Flink on YARN : >>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>>> -Dtaskmanager.network.numberOfBuffers=4096 >>>> >>>> Thanks for any feedback! >>>> >>>> Christophe >>>> >>> >>> >> >