Thanks for the feedback and sorry that I can't try all this straight away.

Is there a way to have a different function than:
WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()

I would like to return a HBase Put and not a SummaryStatistics. So
something like this:
WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()

Christophe

2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> OK, this indicates that the operator following the source is a bottleneck.
>
> If that's the WindowOperator, it makes sense to try the refactoring of the
> WindowFunction.
> Alternatively, you can try to run that operator with a higher parallelism.
>
> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the help, I will try that. The backpressure was on the source
>> (HBase).
>>
>> Christophe
>>
>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi Christophe,
>>>
>>> where does the backpressure appear? In front of the sink operator or
>>> before the window operator?
>>>
>>> In any case, I think you can improve your WindowFunction if you convert
>>> parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>> The FoldFunction would take care of the statistics computation and the
>>> WindowFunction would only assemble the result record including extracting
>>> the start time of the window.
>>>
>>> Then you could do:
>>>
>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>> YourWindowFunction());
>>>
>>> This is more efficient because the FoldFunction is eagerly applied when
>>> ever a new element is added to a window. Hence, the window does only hold a
>>> single value (SummaryStatistics) instead of all element added to the
>>> window. In contrast the WindowFunction is called when the window is finally
>>> evaluated.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>>> christophe.salperw...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I am writing a program to read timeseries from HBase and do some daily
>>>> aggregations (Flink streaming). For now I am just computing some average so
>>>> not very consuming but my HBase read get slower and slower (I have few
>>>> billions of points to read). The back pressure is almost all the time close
>>>> to 1.
>>>>
>>>> I use custom timestamp:
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>
>>>> so I implemented a custom extractor based on:
>>>> AscendingTimestampExtractor
>>>>
>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>> still being written in HBase (I did a sink similar to the example - with a
>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>
>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>
>>>> Do you have an idea why ?
>>>>
>>>> Here is a bit of code if needed:
>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>> .keyBy(0)
>>>> .timeWindow(Time.days(1));
>>>>
>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>
>>>> @Override
>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>> Iterable<ANA> input,
>>>> final Collector<Put> out) throws Exception {
>>>>
>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>> for (final ANA ana : input) {
>>>> summaryStatistics.addValue(ana.getValue());
>>>> }
>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>> summaryStatistics);
>>>> out.collect(put);
>>>> }
>>>> });
>>>>
>>>> And how I started Flink on YARN :
>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>
>>>> Thanks for any feedback!
>>>>
>>>> Christophe
>>>>
>>>
>>>
>>
>

Reply via email to