Hi Max, In fact the Put would be the output of my WindowFunction. I saw Aljoscha replied, seems I will need to create another intermediate class to handle that. But it is fine.
Thx for help! Christophe 2016-06-13 12:25 GMT+02:00 Maximilian Michels <m...@apache.org>: > Hi Christophe, > > A fold function has two inputs: The state and a record to update the > state with. So you can update the SummaryStatistics (state) with each > Put (input). > > Cheers, > Max > > On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck > <christophe.salperw...@gmail.com> wrote: > > Thanks for the feedback and sorry that I can't try all this straight > away. > > > > Is there a way to have a different function than: > > WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>() > > > > I would like to return a HBase Put and not a SummaryStatistics. So > something > > like this: > > WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() > > > > Christophe > > > > 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> > >> OK, this indicates that the operator following the source is a > bottleneck. > >> > >> If that's the WindowOperator, it makes sense to try the refactoring of > the > >> WindowFunction. > >> Alternatively, you can try to run that operator with a higher > parallelism. > >> > >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck > >> <christophe.salperw...@gmail.com>: > >>> > >>> Hi Fabian, > >>> > >>> Thanks for the help, I will try that. The backpressure was on the > source > >>> (HBase). > >>> > >>> Christophe > >>> > >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >>>> > >>>> Hi Christophe, > >>>> > >>>> where does the backpressure appear? In front of the sink operator or > >>>> before the window operator? > >>>> > >>>> In any case, I think you can improve your WindowFunction if you > convert > >>>> parts of it into a FoldFunction<ANA, SummaryStatistics>. > >>>> The FoldFunction would take care of the statistics computation and the > >>>> WindowFunction would only assemble the result record including > extracting > >>>> the start time of the window. > >>>> > >>>> Then you could do: > >>>> > >>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new > >>>> YourWindowFunction()); > >>>> > >>>> This is more efficient because the FoldFunction is eagerly applied > when > >>>> ever a new element is added to a window. Hence, the window does only > hold a > >>>> single value (SummaryStatistics) instead of all element added to the > window. > >>>> In contrast the WindowFunction is called when the window is finally > >>>> evaluated. > >>>> > >>>> Hope this helps, > >>>> Fabian > >>>> > >>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck > >>>> <christophe.salperw...@gmail.com>: > >>>>> > >>>>> Hi, > >>>>> > >>>>> I am writing a program to read timeseries from HBase and do some > daily > >>>>> aggregations (Flink streaming). For now I am just computing some > average so > >>>>> not very consuming but my HBase read get slower and slower (I have > few > >>>>> billions of points to read). The back pressure is almost all the > time close > >>>>> to 1. > >>>>> > >>>>> I use custom timestamp: > >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>>>> > >>>>> so I implemented a custom extractor based on: > >>>>> AscendingTimestampExtractor > >>>>> > >>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M > >>>>> read/s then it get worse and worse. Even when I cancel the job, data > are > >>>>> still being written in HBase (I did a sink similar to the example - > with a > >>>>> cache of 100s of HBase Puts to be a bit more efficient). > >>>>> > >>>>> When I don't put a sink it seems to stay on 1M reads/s. > >>>>> > >>>>> Do you have an idea why ? > >>>>> > >>>>> Here is a bit of code if needed: > >>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) > >>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) > >>>>> .keyBy(0) > >>>>> .timeWindow(Time.days(1)); > >>>>> > >>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new > >>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { > >>>>> > >>>>> @Override > >>>>> public void apply(final Tuple key, final TimeWindow window, final > >>>>> Iterable<ANA> input, > >>>>> final Collector<Put> out) throws Exception { > >>>>> > >>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); > >>>>> for (final ANA ana : input) { > >>>>> summaryStatistics.addValue(ana.getValue()); > >>>>> } > >>>>> final Put put = buildPut((String) key.getField(0), window.getStart(), > >>>>> summaryStatistics); > >>>>> out.collect(put); > >>>>> } > >>>>> }); > >>>>> > >>>>> And how I started Flink on YARN : > >>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 > >>>>> -Dtaskmanager.network.numberOfBuffers=4096 > >>>>> > >>>>> Thanks for any feedback! > >>>>> > >>>>> Christophe > >>>> > >>>> > >>> > >> > > >