To continue, I implemented the ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction());
It works fine when there is no sink, but when I put an HBase sink it seems that the sink, somehow, blocks the flow. The sink writes very little data into HBase and when I limit my input to just few sensors, it works well. Any idea? final SingleOutputStreamOperator<Aggregate> aggregates = ws .apply( new Aggregate(), new FoldFunction<ANA, Aggregate>() { @Override public Aggregate fold(final Aggregate accumulator, final ANA value) throws Exception { accumulator.addValue(value.getValue()); return accumulator; } }, new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() { @Override public void apply(final Tuple key, final TimeWindow window, final Iterable<Aggregate> input, final Collector<Aggregate> out) throws Exception { for (final Aggregate aggregate : input) { aggregate.setM((String) key.getField(0)); aggregate.setTime(window.getStart()); out.collect(aggregate); } } }); aggregates. setParallelism(10). writeUsingOutputFormat(new OutputFormat<Aggregate>() { private static final long serialVersionUID = 1L; HBaseConnect hBaseConnect; Table table; final int flushSize = 100; List<Put> puts = new ArrayList<>(); @Override public void writeRecord(final Aggregate record) throws IOException { puts.add(record.buildPut()); if (puts.size() == flushSize) { table.put(puts); } } @Override public void open(final int taskNumber, final int numTasks) throws IOException { hBaseConnect = new HBaseConnect(); table = hBaseConnect.getHTable("PERF_TEST"); } @Override public void configure(final Configuration parameters) { // TODO Auto-generated method stub } @Override public void close() throws IOException { //last inserts table.put(puts); table.close(); hBaseConnect.close(); } }); 2016-06-13 13:47 GMT+02:00 Maximilian Michels <m...@apache.org>: > Thanks! > > On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck > <christophe.salperw...@gmail.com> wrote: > > Hi, > > I vote on this issue and I agree this would be nice to have. > > > > Thx! > > Christophe > > > > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: > >> > >> Hi, > >> I'm afraid this is currently a shortcoming in the API. There is this > open > >> Jira issue to track it: > https://issues.apache.org/jira/browse/FLINK-3869. We > >> can't fix it before Flink 2.0, though, because we have to keep the API > >> stable on the Flink 1.x release line. > >> > >> Cheers, > >> Aljoscha > >> > >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck > >> <christophe.salperw...@gmail.com> wrote: > >>> > >>> Thanks for the feedback and sorry that I can't try all this straight > >>> away. > >>> > >>> Is there a way to have a different function than: > >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, > TimeWindow>() > >>> > >>> I would like to return a HBase Put and not a SummaryStatistics. So > >>> something like this: > >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() > >>> > >>> Christophe > >>> > >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >>>> > >>>> OK, this indicates that the operator following the source is a > >>>> bottleneck. > >>>> > >>>> If that's the WindowOperator, it makes sense to try the refactoring of > >>>> the WindowFunction. > >>>> Alternatively, you can try to run that operator with a higher > >>>> parallelism. > >>>> > >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck > >>>> <christophe.salperw...@gmail.com>: > >>>>> > >>>>> Hi Fabian, > >>>>> > >>>>> Thanks for the help, I will try that. The backpressure was on the > >>>>> source (HBase). > >>>>> > >>>>> Christophe > >>>>> > >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >>>>>> > >>>>>> Hi Christophe, > >>>>>> > >>>>>> where does the backpressure appear? In front of the sink operator or > >>>>>> before the window operator? > >>>>>> > >>>>>> In any case, I think you can improve your WindowFunction if you > >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>. > >>>>>> The FoldFunction would take care of the statistics computation and > the > >>>>>> WindowFunction would only assemble the result record including > extracting > >>>>>> the start time of the window. > >>>>>> > >>>>>> Then you could do: > >>>>>> > >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new > >>>>>> YourWindowFunction()); > >>>>>> > >>>>>> This is more efficient because the FoldFunction is eagerly applied > >>>>>> when ever a new element is added to a window. Hence, the window > does only > >>>>>> hold a single value (SummaryStatistics) instead of all element > added to the > >>>>>> window. In contrast the WindowFunction is called when the window is > finally > >>>>>> evaluated. > >>>>>> > >>>>>> Hope this helps, > >>>>>> Fabian > >>>>>> > >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck > >>>>>> <christophe.salperw...@gmail.com>: > >>>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> I am writing a program to read timeseries from HBase and do some > >>>>>>> daily aggregations (Flink streaming). For now I am just computing > some > >>>>>>> average so not very consuming but my HBase read get slower and > slower (I > >>>>>>> have few billions of points to read). The back pressure is almost > all the > >>>>>>> time close to 1. > >>>>>>> > >>>>>>> I use custom timestamp: > >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>>>>>> > >>>>>>> so I implemented a custom extractor based on: > >>>>>>> AscendingTimestampExtractor > >>>>>>> > >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M > >>>>>>> read/s then it get worse and worse. Even when I cancel the job, > data are > >>>>>>> still being written in HBase (I did a sink similar to the example > - with a > >>>>>>> cache of 100s of HBase Puts to be a bit more efficient). > >>>>>>> > >>>>>>> When I don't put a sink it seems to stay on 1M reads/s. > >>>>>>> > >>>>>>> Do you have an idea why ? > >>>>>>> > >>>>>>> Here is a bit of code if needed: > >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) > >>>>>>> .assignTimestampsAndWatermarks(new > xxxxAscendingTimestampExtractor()) > >>>>>>> .keyBy(0) > >>>>>>> .timeWindow(Time.days(1)); > >>>>>>> > >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new > >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { > >>>>>>> > >>>>>>> @Override > >>>>>>> public void apply(final Tuple key, final TimeWindow window, final > >>>>>>> Iterable<ANA> input, > >>>>>>> final Collector<Put> out) throws Exception { > >>>>>>> > >>>>>>> final SummaryStatistics summaryStatistics = new > SummaryStatistics(); > >>>>>>> for (final ANA ana : input) { > >>>>>>> summaryStatistics.addValue(ana.getValue()); > >>>>>>> } > >>>>>>> final Put put = buildPut((String) key.getField(0), > window.getStart(), > >>>>>>> summaryStatistics); > >>>>>>> out.collect(put); > >>>>>>> } > >>>>>>> }); > >>>>>>> > >>>>>>> And how I started Flink on YARN : > >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 > >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096 > >>>>>>> > >>>>>>> Thanks for any feedback! > >>>>>>> > >>>>>>> Christophe > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > > >