Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source
(HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Christophe,
>
> where does the backpressure appear? In front of the sink operator or
> before the window operator?
>
> In any case, I think you can improve your WindowFunction if you convert
> parts of it into a FoldFunction<ANA, SummaryStatistics>.
> The FoldFunction would take care of the statistics computation and the
> WindowFunction would only assemble the result record including extracting
> the start time of the window.
>
> Then you could do:
>
> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> YourWindowFunction());
>
> This is more efficient because the FoldFunction is eagerly applied when
> ever a new element is added to a window. Hence, the window does only hold a
> single value (SummaryStatistics) instead of all element added to the
> window. In contrast the WindowFunction is called when the window is finally
> evaluated.
>
> Hope this helps,
> Fabian
>
> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi,
>>
>> I am writing a program to read timeseries from HBase and do some daily
>> aggregations (Flink streaming). For now I am just computing some average so
>> not very consuming but my HBase read get slower and slower (I have few
>> billions of points to read). The back pressure is almost all the time close
>> to 1.
>>
>> I use custom timestamp:
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> so I implemented a custom extractor based on:
>> AscendingTimestampExtractor
>>
>> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
>> then it get worse and worse. Even when I cancel the job, data are still
>> being written in HBase (I did a sink similar to the example - with a cache
>> of 100s of HBase Puts to be a bit more efficient).
>>
>> When I don't put a sink it seems to stay on 1M reads/s.
>>
>> Do you have an idea why ?
>>
>> Here is a bit of code if needed:
>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>> .keyBy(0)
>> .timeWindow(Time.days(1));
>>
>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>
>> @Override
>> public void apply(final Tuple key, final TimeWindow window, final
>> Iterable<ANA> input,
>> final Collector<Put> out) throws Exception {
>>
>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>> for (final ANA ana : input) {
>> summaryStatistics.addValue(ana.getValue());
>> }
>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>> summaryStatistics);
>> out.collect(put);
>> }
>> });
>>
>> And how I started Flink on YARN :
>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>> -Dtaskmanager.network.numberOfBuffers=4096
>>
>> Thanks for any feedback!
>>
>> Christophe
>>
>
>

Reply via email to