Do the backpressure metrics indicate that the sink function is blocking? 2016-06-13 16:58 GMT+02:00 Christophe Salperwyck < christophe.salperw...@gmail.com>:
> To continue, I implemented the ws.apply(new SummaryStatistics(), new > YourFoldFunction(), new YourWindowFunction()); > > It works fine when there is no sink, but when I put an HBase sink it seems > that the sink, somehow, blocks the flow. The sink writes very little data > into HBase and when I limit my input to just few sensors, it works well. Any > idea? > > final SingleOutputStreamOperator<Aggregate> aggregates = ws > .apply( > new Aggregate(), > new FoldFunction<ANA, Aggregate>() { > > @Override > public Aggregate fold(final Aggregate accumulator, final ANA value) throws > Exception { > accumulator.addValue(value.getValue()); > return accumulator; > } > }, > new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() { > > @Override > public void apply(final Tuple key, final TimeWindow window, final > Iterable<Aggregate> input, > final Collector<Aggregate> out) throws Exception { > for (final Aggregate aggregate : input) { > aggregate.setM((String) key.getField(0)); > aggregate.setTime(window.getStart()); > out.collect(aggregate); > } > } > }); > aggregates. > setParallelism(10). > writeUsingOutputFormat(new OutputFormat<Aggregate>() { > private static final long serialVersionUID = 1L; > HBaseConnect hBaseConnect; > Table table; > final int flushSize = 100; > List<Put> puts = new ArrayList<>(); > @Override > public void writeRecord(final Aggregate record) throws IOException { > puts.add(record.buildPut()); > if (puts.size() == flushSize) { > table.put(puts); > } > } > @Override > public void open(final int taskNumber, final int numTasks) throws > IOException { > hBaseConnect = new HBaseConnect(); > table = hBaseConnect.getHTable("PERF_TEST"); > } > @Override > public void configure(final Configuration parameters) { > // TODO Auto-generated method stub > } > @Override > public void close() throws IOException { > //last inserts > table.put(puts); > table.close(); > hBaseConnect.close(); > } > }); > > 2016-06-13 13:47 GMT+02:00 Maximilian Michels <m...@apache.org>: > >> Thanks! >> >> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck >> <christophe.salperw...@gmail.com> wrote: >> > Hi, >> > I vote on this issue and I agree this would be nice to have. >> > >> > Thx! >> > Christophe >> > >> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >> >> >> >> Hi, >> >> I'm afraid this is currently a shortcoming in the API. There is this >> open >> >> Jira issue to track it: >> https://issues.apache.org/jira/browse/FLINK-3869. We >> >> can't fix it before Flink 2.0, though, because we have to keep the API >> >> stable on the Flink 1.x release line. >> >> >> >> Cheers, >> >> Aljoscha >> >> >> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck >> >> <christophe.salperw...@gmail.com> wrote: >> >>> >> >>> Thanks for the feedback and sorry that I can't try all this straight >> >>> away. >> >>> >> >>> Is there a way to have a different function than: >> >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, >> TimeWindow>() >> >>> >> >>> I would like to return a HBase Put and not a SummaryStatistics. So >> >>> something like this: >> >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() >> >>> >> >>> Christophe >> >>> >> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>>> >> >>>> OK, this indicates that the operator following the source is a >> >>>> bottleneck. >> >>>> >> >>>> If that's the WindowOperator, it makes sense to try the refactoring >> of >> >>>> the WindowFunction. >> >>>> Alternatively, you can try to run that operator with a higher >> >>>> parallelism. >> >>>> >> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >> >>>> <christophe.salperw...@gmail.com>: >> >>>>> >> >>>>> Hi Fabian, >> >>>>> >> >>>>> Thanks for the help, I will try that. The backpressure was on the >> >>>>> source (HBase). >> >>>>> >> >>>>> Christophe >> >>>>> >> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>>>>> >> >>>>>> Hi Christophe, >> >>>>>> >> >>>>>> where does the backpressure appear? In front of the sink operator >> or >> >>>>>> before the window operator? >> >>>>>> >> >>>>>> In any case, I think you can improve your WindowFunction if you >> >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>. >> >>>>>> The FoldFunction would take care of the statistics computation and >> the >> >>>>>> WindowFunction would only assemble the result record including >> extracting >> >>>>>> the start time of the window. >> >>>>>> >> >>>>>> Then you could do: >> >>>>>> >> >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >> >>>>>> YourWindowFunction()); >> >>>>>> >> >>>>>> This is more efficient because the FoldFunction is eagerly applied >> >>>>>> when ever a new element is added to a window. Hence, the window >> does only >> >>>>>> hold a single value (SummaryStatistics) instead of all element >> added to the >> >>>>>> window. In contrast the WindowFunction is called when the window >> is finally >> >>>>>> evaluated. >> >>>>>> >> >>>>>> Hope this helps, >> >>>>>> Fabian >> >>>>>> >> >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck >> >>>>>> <christophe.salperw...@gmail.com>: >> >>>>>>> >> >>>>>>> Hi, >> >>>>>>> >> >>>>>>> I am writing a program to read timeseries from HBase and do some >> >>>>>>> daily aggregations (Flink streaming). For now I am just computing >> some >> >>>>>>> average so not very consuming but my HBase read get slower and >> slower (I >> >>>>>>> have few billions of points to read). The back pressure is almost >> all the >> >>>>>>> time close to 1. >> >>>>>>> >> >>>>>>> I use custom timestamp: >> >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >>>>>>> >> >>>>>>> so I implemented a custom extractor based on: >> >>>>>>> AscendingTimestampExtractor >> >>>>>>> >> >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M >> >>>>>>> read/s then it get worse and worse. Even when I cancel the job, >> data are >> >>>>>>> still being written in HBase (I did a sink similar to the example >> - with a >> >>>>>>> cache of 100s of HBase Puts to be a bit more efficient). >> >>>>>>> >> >>>>>>> When I don't put a sink it seems to stay on 1M reads/s. >> >>>>>>> >> >>>>>>> Do you have an idea why ? >> >>>>>>> >> >>>>>>> Here is a bit of code if needed: >> >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >> >>>>>>> .assignTimestampsAndWatermarks(new >> xxxxAscendingTimestampExtractor()) >> >>>>>>> .keyBy(0) >> >>>>>>> .timeWindow(Time.days(1)); >> >>>>>>> >> >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >> >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >> >>>>>>> >> >>>>>>> @Override >> >>>>>>> public void apply(final Tuple key, final TimeWindow window, final >> >>>>>>> Iterable<ANA> input, >> >>>>>>> final Collector<Put> out) throws Exception { >> >>>>>>> >> >>>>>>> final SummaryStatistics summaryStatistics = new >> SummaryStatistics(); >> >>>>>>> for (final ANA ana : input) { >> >>>>>>> summaryStatistics.addValue(ana.getValue()); >> >>>>>>> } >> >>>>>>> final Put put = buildPut((String) key.getField(0), >> window.getStart(), >> >>>>>>> summaryStatistics); >> >>>>>>> out.collect(put); >> >>>>>>> } >> >>>>>>> }); >> >>>>>>> >> >>>>>>> And how I started Flink on YARN : >> >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >> >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096 >> >>>>>>> >> >>>>>>> Thanks for any feedback! >> >>>>>>> >> >>>>>>> Christophe >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> > >> > >