I would need to restart it to be sure (and when it starts to be stuck, the web interface doesn't give the backpressure anymore), but it seems so. I put a text file as the output and it took 5h to complete: aggregates.writeAsText("hdfs:///user/christophe/flinkHBase");
What is weird is that I have as many lines as in my input if I don't limit the scan (14B rows). If I limit the scan (150M) I have 1 line per hour as expected. I am investigating a bit more right now. Thanks again! Christophe 2016-06-13 18:50 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Do the backpressure metrics indicate that the sink function is blocking? > > 2016-06-13 16:58 GMT+02:00 Christophe Salperwyck < > christophe.salperw...@gmail.com>: > >> To continue, I implemented the ws.apply(new SummaryStatistics(), new >> YourFoldFunction(), new YourWindowFunction()); >> >> It works fine when there is no sink, but when I put an HBase sink it >> seems that the sink, somehow, blocks the flow. The sink writes very little >> data into HBase and when I limit my input to just few sensors, it works >> well. Any idea? >> >> final SingleOutputStreamOperator<Aggregate> aggregates = ws >> .apply( >> new Aggregate(), >> new FoldFunction<ANA, Aggregate>() { >> >> @Override >> public Aggregate fold(final Aggregate accumulator, final ANA value) >> throws Exception { >> accumulator.addValue(value.getValue()); >> return accumulator; >> } >> }, >> new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() { >> >> @Override >> public void apply(final Tuple key, final TimeWindow window, final >> Iterable<Aggregate> input, >> final Collector<Aggregate> out) throws Exception { >> for (final Aggregate aggregate : input) { >> aggregate.setM((String) key.getField(0)); >> aggregate.setTime(window.getStart()); >> out.collect(aggregate); >> } >> } >> }); >> aggregates. >> setParallelism(10). >> writeUsingOutputFormat(new OutputFormat<Aggregate>() { >> private static final long serialVersionUID = 1L; >> HBaseConnect hBaseConnect; >> Table table; >> final int flushSize = 100; >> List<Put> puts = new ArrayList<>(); >> @Override >> public void writeRecord(final Aggregate record) throws IOException { >> puts.add(record.buildPut()); >> if (puts.size() == flushSize) { >> table.put(puts); >> } >> } >> @Override >> public void open(final int taskNumber, final int numTasks) throws >> IOException { >> hBaseConnect = new HBaseConnect(); >> table = hBaseConnect.getHTable("PERF_TEST"); >> } >> @Override >> public void configure(final Configuration parameters) { >> // TODO Auto-generated method stub >> } >> @Override >> public void close() throws IOException { >> //last inserts >> table.put(puts); >> table.close(); >> hBaseConnect.close(); >> } >> }); >> >> 2016-06-13 13:47 GMT+02:00 Maximilian Michels <m...@apache.org>: >> >>> Thanks! >>> >>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck >>> <christophe.salperw...@gmail.com> wrote: >>> > Hi, >>> > I vote on this issue and I agree this would be nice to have. >>> > >>> > Thx! >>> > Christophe >>> > >>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >>> >> >>> >> Hi, >>> >> I'm afraid this is currently a shortcoming in the API. There is this >>> open >>> >> Jira issue to track it: >>> https://issues.apache.org/jira/browse/FLINK-3869. We >>> >> can't fix it before Flink 2.0, though, because we have to keep the API >>> >> stable on the Flink 1.x release line. >>> >> >>> >> Cheers, >>> >> Aljoscha >>> >> >>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck >>> >> <christophe.salperw...@gmail.com> wrote: >>> >>> >>> >>> Thanks for the feedback and sorry that I can't try all this straight >>> >>> away. >>> >>> >>> >>> Is there a way to have a different function than: >>> >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, >>> TimeWindow>() >>> >>> >>> >>> I would like to return a HBase Put and not a SummaryStatistics. So >>> >>> something like this: >>> >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() >>> >>> >>> >>> Christophe >>> >>> >>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>> >>> >>>> OK, this indicates that the operator following the source is a >>> >>>> bottleneck. >>> >>>> >>> >>>> If that's the WindowOperator, it makes sense to try the refactoring >>> of >>> >>>> the WindowFunction. >>> >>>> Alternatively, you can try to run that operator with a higher >>> >>>> parallelism. >>> >>>> >>> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >>> >>>> <christophe.salperw...@gmail.com>: >>> >>>>> >>> >>>>> Hi Fabian, >>> >>>>> >>> >>>>> Thanks for the help, I will try that. The backpressure was on the >>> >>>>> source (HBase). >>> >>>>> >>> >>>>> Christophe >>> >>>>> >>> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>>>> >>> >>>>>> Hi Christophe, >>> >>>>>> >>> >>>>>> where does the backpressure appear? In front of the sink operator >>> or >>> >>>>>> before the window operator? >>> >>>>>> >>> >>>>>> In any case, I think you can improve your WindowFunction if you >>> >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>. >>> >>>>>> The FoldFunction would take care of the statistics computation >>> and the >>> >>>>>> WindowFunction would only assemble the result record including >>> extracting >>> >>>>>> the start time of the window. >>> >>>>>> >>> >>>>>> Then you could do: >>> >>>>>> >>> >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>> >>>>>> YourWindowFunction()); >>> >>>>>> >>> >>>>>> This is more efficient because the FoldFunction is eagerly applied >>> >>>>>> when ever a new element is added to a window. Hence, the window >>> does only >>> >>>>>> hold a single value (SummaryStatistics) instead of all element >>> added to the >>> >>>>>> window. In contrast the WindowFunction is called when the window >>> is finally >>> >>>>>> evaluated. >>> >>>>>> >>> >>>>>> Hope this helps, >>> >>>>>> Fabian >>> >>>>>> >>> >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck >>> >>>>>> <christophe.salperw...@gmail.com>: >>> >>>>>>> >>> >>>>>>> Hi, >>> >>>>>>> >>> >>>>>>> I am writing a program to read timeseries from HBase and do some >>> >>>>>>> daily aggregations (Flink streaming). For now I am just >>> computing some >>> >>>>>>> average so not very consuming but my HBase read get slower and >>> slower (I >>> >>>>>>> have few billions of points to read). The back pressure is >>> almost all the >>> >>>>>>> time close to 1. >>> >>>>>>> >>> >>>>>>> I use custom timestamp: >>> >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>>>>>> >>> >>>>>>> so I implemented a custom extractor based on: >>> >>>>>>> AscendingTimestampExtractor >>> >>>>>>> >>> >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just >>> 1M >>> >>>>>>> read/s then it get worse and worse. Even when I cancel the job, >>> data are >>> >>>>>>> still being written in HBase (I did a sink similar to the >>> example - with a >>> >>>>>>> cache of 100s of HBase Puts to be a bit more efficient). >>> >>>>>>> >>> >>>>>>> When I don't put a sink it seems to stay on 1M reads/s. >>> >>>>>>> >>> >>>>>>> Do you have an idea why ? >>> >>>>>>> >>> >>>>>>> Here is a bit of code if needed: >>> >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = >>> hbaseDS.keyBy(0) >>> >>>>>>> .assignTimestampsAndWatermarks(new >>> xxxxAscendingTimestampExtractor()) >>> >>>>>>> .keyBy(0) >>> >>>>>>> .timeWindow(Time.days(1)); >>> >>>>>>> >>> >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>> >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>> >>>>>>> >>> >>>>>>> @Override >>> >>>>>>> public void apply(final Tuple key, final TimeWindow window, final >>> >>>>>>> Iterable<ANA> input, >>> >>>>>>> final Collector<Put> out) throws Exception { >>> >>>>>>> >>> >>>>>>> final SummaryStatistics summaryStatistics = new >>> SummaryStatistics(); >>> >>>>>>> for (final ANA ana : input) { >>> >>>>>>> summaryStatistics.addValue(ana.getValue()); >>> >>>>>>> } >>> >>>>>>> final Put put = buildPut((String) key.getField(0), >>> window.getStart(), >>> >>>>>>> summaryStatistics); >>> >>>>>>> out.collect(put); >>> >>>>>>> } >>> >>>>>>> }); >>> >>>>>>> >>> >>>>>>> And how I started Flink on YARN : >>> >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>> >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096 >>> >>>>>>> >>> >>>>>>> Thanks for any feedback! >>> >>>>>>> >>> >>>>>>> Christophe >>> >>>>>> >>> >>>>>> >>> >>>>> >>> >>>> >>> >>> >>> > >>> >> >> >