Hi Max,

In fact the Put would be the output of my WindowFunction. I saw Aljoscha
replied, seems I will need to create another intermediate class to handle
that. But it is fine.

Thx for help!
Christophe

2016-06-13 12:25 GMT+02:00 Maximilian Michels <m...@apache.org>:

> Hi Christophe,
>
> A fold function has two inputs: The state and a record to update the
> state with. So you can update the SummaryStatistics (state) with each
> Put (input).
>
> Cheers,
> Max
>
> On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
> <christophe.salperw...@gmail.com> wrote:
> > Thanks for the feedback and sorry that I can't try all this straight
> away.
> >
> > Is there a way to have a different function than:
> > WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
> >
> > I would like to return a HBase Put and not a SummaryStatistics. So
> something
> > like this:
> > WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
> >
> > Christophe
> >
> > 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> >>
> >> OK, this indicates that the operator following the source is a
> bottleneck.
> >>
> >> If that's the WindowOperator, it makes sense to try the refactoring of
> the
> >> WindowFunction.
> >> Alternatively, you can try to run that operator with a higher
> parallelism.
> >>
> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
> >> <christophe.salperw...@gmail.com>:
> >>>
> >>> Hi Fabian,
> >>>
> >>> Thanks for the help, I will try that. The backpressure was on the
> source
> >>> (HBase).
> >>>
> >>> Christophe
> >>>
> >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> >>>>
> >>>> Hi Christophe,
> >>>>
> >>>> where does the backpressure appear? In front of the sink operator or
> >>>> before the window operator?
> >>>>
> >>>> In any case, I think you can improve your WindowFunction if you
> convert
> >>>> parts of it into a FoldFunction<ANA, SummaryStatistics>.
> >>>> The FoldFunction would take care of the statistics computation and the
> >>>> WindowFunction would only assemble the result record including
> extracting
> >>>> the start time of the window.
> >>>>
> >>>> Then you could do:
> >>>>
> >>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> >>>> YourWindowFunction());
> >>>>
> >>>> This is more efficient because the FoldFunction is eagerly applied
> when
> >>>> ever a new element is added to a window. Hence, the window does only
> hold a
> >>>> single value (SummaryStatistics) instead of all element added to the
> window.
> >>>> In contrast the WindowFunction is called when the window is finally
> >>>> evaluated.
> >>>>
> >>>> Hope this helps,
> >>>> Fabian
> >>>>
> >>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
> >>>> <christophe.salperw...@gmail.com>:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I am writing a program to read timeseries from HBase and do some
> daily
> >>>>> aggregations (Flink streaming). For now I am just computing some
> average so
> >>>>> not very consuming but my HBase read get slower and slower (I have
> few
> >>>>> billions of points to read). The back pressure is almost all the
> time close
> >>>>> to 1.
> >>>>>
> >>>>> I use custom timestamp:
> >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>>>>
> >>>>> so I implemented a custom extractor based on:
> >>>>> AscendingTimestampExtractor
> >>>>>
> >>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
> >>>>> read/s then it get worse and worse. Even when I cancel the job, data
> are
> >>>>> still being written in HBase (I did a sink similar to the example -
> with a
> >>>>> cache of 100s of HBase Puts to be a bit more efficient).
> >>>>>
> >>>>> When I don't put a sink it seems to stay on 1M reads/s.
> >>>>>
> >>>>> Do you have an idea why ?
> >>>>>
> >>>>> Here is a bit of code if needed:
> >>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
> >>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
> >>>>> .keyBy(0)
> >>>>> .timeWindow(Time.days(1));
> >>>>>
> >>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
> >>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
> >>>>>
> >>>>> @Override
> >>>>> public void apply(final Tuple key, final TimeWindow window, final
> >>>>> Iterable<ANA> input,
> >>>>> final Collector<Put> out) throws Exception {
> >>>>>
> >>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> >>>>> for (final ANA ana : input) {
> >>>>> summaryStatistics.addValue(ana.getValue());
> >>>>> }
> >>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
> >>>>> summaryStatistics);
> >>>>> out.collect(put);
> >>>>> }
> >>>>> });
> >>>>>
> >>>>> And how I started Flink on YARN :
> >>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> >>>>> -Dtaskmanager.network.numberOfBuffers=4096
> >>>>>
> >>>>> Thanks for any feedback!
> >>>>>
> >>>>> Christophe
> >>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to