Hi,

I am writing a program to read timeseries from HBase and do some daily
aggregations (Flink streaming). For now I am just computing some average so
not very consuming but my HBase read get slower and slower (I have few
billions of points to read). The back pressure is almost all the time close
to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
then it get worse and worse. Even when I cancel the job, data are still
being written in HBase (I did a sink similar to the example - with a cache
of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator<Put> puts = ws.apply(new
WindowFunction<ANA, Put, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final
Iterable<ANA> input,
final Collector<Put> out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(),
summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
-Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe

Reply via email to