Hi,
I vote on this issue and I agree this would be nice to have.

Thx!
Christophe

2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
> I'm afraid this is currently a shortcoming in the API. There is this open
> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869.
> We can't fix it before Flink 2.0, though, because we have to keep the API
> stable on the Flink 1.x release line.
>
> Cheers,
> Aljoscha
>
> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <
> christophe.salperw...@gmail.com> wrote:
>
>> Thanks for the feedback and sorry that I can't try all this straight away.
>>
>> Is there a way to have a different function than:
>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>>
>> I would like to return a HBase Put and not a SummaryStatistics. So
>> something like this:
>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>>
>> Christophe
>>
>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> OK, this indicates that the operator following the source is a
>>> bottleneck.
>>>
>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>> the WindowFunction.
>>> Alternatively, you can try to run that operator with a higher
>>> parallelism.
>>>
>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
>>> christophe.salperw...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for the help, I will try that. The backpressure was on the
>>>> source (HBase).
>>>>
>>>> Christophe
>>>>
>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> Hi Christophe,
>>>>>
>>>>> where does the backpressure appear? In front of the sink operator or
>>>>> before the window operator?
>>>>>
>>>>> In any case, I think you can improve your WindowFunction if you
>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>>> The FoldFunction would take care of the statistics computation and the
>>>>> WindowFunction would only assemble the result record including extracting
>>>>> the start time of the window.
>>>>>
>>>>> Then you could do:
>>>>>
>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>>> YourWindowFunction());
>>>>>
>>>>> This is more efficient because the FoldFunction is eagerly applied
>>>>> when ever a new element is added to a window. Hence, the window does only
>>>>> hold a single value (SummaryStatistics) instead of all element added to 
>>>>> the
>>>>> window. In contrast the WindowFunction is called when the window is 
>>>>> finally
>>>>> evaluated.
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>>>>> christophe.salperw...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am writing a program to read timeseries from HBase and do some
>>>>>> daily aggregations (Flink streaming). For now I am just computing some
>>>>>> average so not very consuming but my HBase read get slower and slower (I
>>>>>> have few billions of points to read). The back pressure is almost all the
>>>>>> time close to 1.
>>>>>>
>>>>>> I use custom timestamp:
>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>
>>>>>> so I implemented a custom extractor based on:
>>>>>> AscendingTimestampExtractor
>>>>>>
>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>>> still being written in HBase (I did a sink similar to the example - with 
>>>>>> a
>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>>
>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>>
>>>>>> Do you have an idea why ?
>>>>>>
>>>>>> Here is a bit of code if needed:
>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>>> .keyBy(0)
>>>>>> .timeWindow(Time.days(1));
>>>>>>
>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>>
>>>>>> @Override
>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>>> Iterable<ANA> input,
>>>>>> final Collector<Put> out) throws Exception {
>>>>>>
>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>>> for (final ANA ana : input) {
>>>>>> summaryStatistics.addValue(ana.getValue());
>>>>>> }
>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>>> summaryStatistics);
>>>>>> out.collect(put);
>>>>>> }
>>>>>> });
>>>>>>
>>>>>> And how I started Flink on YARN :
>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>>
>>>>>> Thanks for any feedback!
>>>>>>
>>>>>> Christophe
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to