Hi, I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1.
I use custom timestamp: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); so I implemented a custom extractor based on: AscendingTimestampExtractor At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient). When I don't put a sink it seems to stay on 1M reads/s. Do you have an idea why ? Here is a bit of code if needed: final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) .keyBy(0) .timeWindow(Time.days(1)); final SingleOutputStreamOperator<Put> puts = ws.apply(new WindowFunction<ANA, Put, Tuple, TimeWindow>() { @Override public void apply(final Tuple key, final TimeWindow window, final Iterable<ANA> input, final Collector<Put> out) throws Exception { final SummaryStatistics summaryStatistics = new SummaryStatistics(); for (final ANA ana : input) { summaryStatistics.addValue(ana.getValue()); } final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics); out.collect(put); } }); And how I started Flink on YARN : flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096 Thanks for any feedback! Christophe