Hi Fabian, Thanks for the help, I will try that. The backpressure was on the source (HBase).
Christophe 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Christophe, > > where does the backpressure appear? In front of the sink operator or > before the window operator? > > In any case, I think you can improve your WindowFunction if you convert > parts of it into a FoldFunction<ANA, SummaryStatistics>. > The FoldFunction would take care of the statistics computation and the > WindowFunction would only assemble the result record including extracting > the start time of the window. > > Then you could do: > > ws.apply(new SummaryStatistics(), new YourFoldFunction(), new > YourWindowFunction()); > > This is more efficient because the FoldFunction is eagerly applied when > ever a new element is added to a window. Hence, the window does only hold a > single value (SummaryStatistics) instead of all element added to the > window. In contrast the WindowFunction is called when the window is finally > evaluated. > > Hope this helps, > Fabian > > 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < > christophe.salperw...@gmail.com>: > >> Hi, >> >> I am writing a program to read timeseries from HBase and do some daily >> aggregations (Flink streaming). For now I am just computing some average so >> not very consuming but my HBase read get slower and slower (I have few >> billions of points to read). The back pressure is almost all the time close >> to 1. >> >> I use custom timestamp: >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> so I implemented a custom extractor based on: >> AscendingTimestampExtractor >> >> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s >> then it get worse and worse. Even when I cancel the job, data are still >> being written in HBase (I did a sink similar to the example - with a cache >> of 100s of HBase Puts to be a bit more efficient). >> >> When I don't put a sink it seems to stay on 1M reads/s. >> >> Do you have an idea why ? >> >> Here is a bit of code if needed: >> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >> .keyBy(0) >> .timeWindow(Time.days(1)); >> >> final SingleOutputStreamOperator<Put> puts = ws.apply(new >> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >> >> @Override >> public void apply(final Tuple key, final TimeWindow window, final >> Iterable<ANA> input, >> final Collector<Put> out) throws Exception { >> >> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >> for (final ANA ana : input) { >> summaryStatistics.addValue(ana.getValue()); >> } >> final Put put = buildPut((String) key.getField(0), window.getStart(), >> summaryStatistics); >> out.collect(put); >> } >> }); >> >> And how I started Flink on YARN : >> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >> -Dtaskmanager.network.numberOfBuffers=4096 >> >> Thanks for any feedback! >> >> Christophe >> > >