Re: HBase reads and back pressure
I would need to restart it to be sure (and when it starts to be stuck, the web interface doesn't give the backpressure anymore), but it seems so. I put a text file as the output and it took 5h to complete: aggregates.writeAsText("hdfs:///user/christophe/flinkHBase"); What is weird is that I have as many lines as in my input if I don't limit the scan (14B rows). If I limit the scan (150M) I have 1 line per hour as expected. I am investigating a bit more right now. Thanks again! Christophe 2016-06-13 18:50 GMT+02:00 Fabian Hueske: > Do the backpressure metrics indicate that the sink function is blocking? > > 2016-06-13 16:58 GMT+02:00 Christophe Salperwyck < > christophe.salperw...@gmail.com>: > >> To continue, I implemented the ws.apply(new SummaryStatistics(), new >> YourFoldFunction(), new YourWindowFunction()); >> >> It works fine when there is no sink, but when I put an HBase sink it >> seems that the sink, somehow, blocks the flow. The sink writes very little >> data into HBase and when I limit my input to just few sensors, it works >> well. Any idea? >> >> final SingleOutputStreamOperator aggregates = ws >> .apply( >> new Aggregate(), >> new FoldFunction () { >> >> @Override >> public Aggregate fold(final Aggregate accumulator, final ANA value) >> throws Exception { >> accumulator.addValue(value.getValue()); >> return accumulator; >> } >> }, >> new WindowFunction () { >> >> @Override >> public void apply(final Tuple key, final TimeWindow window, final >> Iterable input, >> final Collector out) throws Exception { >> for (final Aggregate aggregate : input) { >> aggregate.setM((String) key.getField(0)); >> aggregate.setTime(window.getStart()); >> out.collect(aggregate); >> } >> } >> }); >> aggregates. >> setParallelism(10). >> writeUsingOutputFormat(new OutputFormat() { >> private static final long serialVersionUID = 1L; >> HBaseConnect hBaseConnect; >> Table table; >> final int flushSize = 100; >> List puts = new ArrayList<>(); >> @Override >> public void writeRecord(final Aggregate record) throws IOException { >> puts.add(record.buildPut()); >> if (puts.size() == flushSize) { >> table.put(puts); >> } >> } >> @Override >> public void open(final int taskNumber, final int numTasks) throws >> IOException { >> hBaseConnect = new HBaseConnect(); >> table = hBaseConnect.getHTable("PERF_TEST"); >> } >> @Override >> public void configure(final Configuration parameters) { >> // TODO Auto-generated method stub >> } >> @Override >> public void close() throws IOException { >> //last inserts >> table.put(puts); >> table.close(); >> hBaseConnect.close(); >> } >> }); >> >> 2016-06-13 13:47 GMT+02:00 Maximilian Michels : >> >>> Thanks! >>> >>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck >>> wrote: >>> > Hi, >>> > I vote on this issue and I agree this would be nice to have. >>> > >>> > Thx! >>> > Christophe >>> > >>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek : >>> >> >>> >> Hi, >>> >> I'm afraid this is currently a shortcoming in the API. There is this >>> open >>> >> Jira issue to track it: >>> https://issues.apache.org/jira/browse/FLINK-3869. We >>> >> can't fix it before Flink 2.0, though, because we have to keep the API >>> >> stable on the Flink 1.x release line. >>> >> >>> >> Cheers, >>> >> Aljoscha >>> >> >>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck >>> >> wrote: >>> >>> >>> >>> Thanks for the feedback and sorry that I can't try all this straight >>> >>> away. >>> >>> >>> >>> Is there a way to have a different function than: >>> >>> WindowFunction >> TimeWindow>() >>> >>> >>> >>> I would like to return a HBase Put and not a SummaryStatistics. So >>> >>> something like this: >>> >>> WindowFunction () >>> >>> >>> >>> Christophe >>> >>> >>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske : >>> >>> OK, this indicates that the operator following the source is a >>> bottleneck. >>> >>> If that's the WindowOperator, it makes sense to try the refactoring >>> of >>> the WindowFunction. >>> Alternatively, you can try to run that operator with a higher >>> parallelism. >>> >>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >>> : >>> > >>> > Hi Fabian, >>> > >>> > Thanks for the help, I will try that. The backpressure was on the >>> > source (HBase). >>> > >>> > Christophe >>> > >>> > 2016-06-09 16:38 GMT+02:00 Fabian Hueske : >>> >> >>> >> Hi Christophe, >>> >> >>> >> where does the backpressure appear? In front of the sink operator >>> or >>> >> before the window operator? >>> >> >>> >> In any case, I think you can improve your
Re: HBase reads and back pressure
Do the backpressure metrics indicate that the sink function is blocking? 2016-06-13 16:58 GMT+02:00 Christophe Salperwyck < christophe.salperw...@gmail.com>: > To continue, I implemented the ws.apply(new SummaryStatistics(), new > YourFoldFunction(), new YourWindowFunction()); > > It works fine when there is no sink, but when I put an HBase sink it seems > that the sink, somehow, blocks the flow. The sink writes very little data > into HBase and when I limit my input to just few sensors, it works well. Any > idea? > > final SingleOutputStreamOperator aggregates = ws > .apply( > new Aggregate(), > new FoldFunction() { > > @Override > public Aggregate fold(final Aggregate accumulator, final ANA value) throws > Exception { > accumulator.addValue(value.getValue()); > return accumulator; > } > }, > new WindowFunction () { > > @Override > public void apply(final Tuple key, final TimeWindow window, final > Iterable input, > final Collector out) throws Exception { > for (final Aggregate aggregate : input) { > aggregate.setM((String) key.getField(0)); > aggregate.setTime(window.getStart()); > out.collect(aggregate); > } > } > }); > aggregates. > setParallelism(10). > writeUsingOutputFormat(new OutputFormat() { > private static final long serialVersionUID = 1L; > HBaseConnect hBaseConnect; > Table table; > final int flushSize = 100; > List puts = new ArrayList<>(); > @Override > public void writeRecord(final Aggregate record) throws IOException { > puts.add(record.buildPut()); > if (puts.size() == flushSize) { > table.put(puts); > } > } > @Override > public void open(final int taskNumber, final int numTasks) throws > IOException { > hBaseConnect = new HBaseConnect(); > table = hBaseConnect.getHTable("PERF_TEST"); > } > @Override > public void configure(final Configuration parameters) { > // TODO Auto-generated method stub > } > @Override > public void close() throws IOException { > //last inserts > table.put(puts); > table.close(); > hBaseConnect.close(); > } > }); > > 2016-06-13 13:47 GMT+02:00 Maximilian Michels : > >> Thanks! >> >> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck >> wrote: >> > Hi, >> > I vote on this issue and I agree this would be nice to have. >> > >> > Thx! >> > Christophe >> > >> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek : >> >> >> >> Hi, >> >> I'm afraid this is currently a shortcoming in the API. There is this >> open >> >> Jira issue to track it: >> https://issues.apache.org/jira/browse/FLINK-3869. We >> >> can't fix it before Flink 2.0, though, because we have to keep the API >> >> stable on the Flink 1.x release line. >> >> >> >> Cheers, >> >> Aljoscha >> >> >> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck >> >> wrote: >> >>> >> >>> Thanks for the feedback and sorry that I can't try all this straight >> >>> away. >> >>> >> >>> Is there a way to have a different function than: >> >>> WindowFunction > TimeWindow>() >> >>> >> >>> I would like to return a HBase Put and not a SummaryStatistics. So >> >>> something like this: >> >>> WindowFunction () >> >>> >> >>> Christophe >> >>> >> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske : >> >> OK, this indicates that the operator following the source is a >> bottleneck. >> >> If that's the WindowOperator, it makes sense to try the refactoring >> of >> the WindowFunction. >> Alternatively, you can try to run that operator with a higher >> parallelism. >> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >> : >> > >> > Hi Fabian, >> > >> > Thanks for the help, I will try that. The backpressure was on the >> > source (HBase). >> > >> > Christophe >> > >> > 2016-06-09 16:38 GMT+02:00 Fabian Hueske : >> >> >> >> Hi Christophe, >> >> >> >> where does the backpressure appear? In front of the sink operator >> or >> >> before the window operator? >> >> >> >> In any case, I think you can improve your WindowFunction if you >> >> convert parts of it into a FoldFunction . >> >> The FoldFunction would take care of the statistics computation and >> the >> >> WindowFunction would only assemble the result record including >> extracting >> >> the start time of the window. >> >> >> >> Then you could do: >> >> >> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >> >> YourWindowFunction()); >> >> >> >> This is more efficient because the FoldFunction is eagerly applied >> >> when ever a new element is added to a window. Hence, the window >> does only >> >> hold a single value (SummaryStatistics) instead of all element >> added
Re: HBase reads and back pressure
To continue, I implemented the ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction()); It works fine when there is no sink, but when I put an HBase sink it seems that the sink, somehow, blocks the flow. The sink writes very little data into HBase and when I limit my input to just few sensors, it works well. Any idea? final SingleOutputStreamOperator aggregates = ws .apply( new Aggregate(), new FoldFunction() { @Override public Aggregate fold(final Aggregate accumulator, final ANA value) throws Exception { accumulator.addValue(value.getValue()); return accumulator; } }, new WindowFunction () { @Override public void apply(final Tuple key, final TimeWindow window, final Iterable input, final Collector out) throws Exception { for (final Aggregate aggregate : input) { aggregate.setM((String) key.getField(0)); aggregate.setTime(window.getStart()); out.collect(aggregate); } } }); aggregates. setParallelism(10). writeUsingOutputFormat(new OutputFormat() { private static final long serialVersionUID = 1L; HBaseConnect hBaseConnect; Table table; final int flushSize = 100; List puts = new ArrayList<>(); @Override public void writeRecord(final Aggregate record) throws IOException { puts.add(record.buildPut()); if (puts.size() == flushSize) { table.put(puts); } } @Override public void open(final int taskNumber, final int numTasks) throws IOException { hBaseConnect = new HBaseConnect(); table = hBaseConnect.getHTable("PERF_TEST"); } @Override public void configure(final Configuration parameters) { // TODO Auto-generated method stub } @Override public void close() throws IOException { //last inserts table.put(puts); table.close(); hBaseConnect.close(); } }); 2016-06-13 13:47 GMT+02:00 Maximilian Michels : > Thanks! > > On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck > wrote: > > Hi, > > I vote on this issue and I agree this would be nice to have. > > > > Thx! > > Christophe > > > > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek : > >> > >> Hi, > >> I'm afraid this is currently a shortcoming in the API. There is this > open > >> Jira issue to track it: > https://issues.apache.org/jira/browse/FLINK-3869. We > >> can't fix it before Flink 2.0, though, because we have to keep the API > >> stable on the Flink 1.x release line. > >> > >> Cheers, > >> Aljoscha > >> > >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck > >> wrote: > >>> > >>> Thanks for the feedback and sorry that I can't try all this straight > >>> away. > >>> > >>> Is there a way to have a different function than: > >>> WindowFunction TimeWindow>() > >>> > >>> I would like to return a HBase Put and not a SummaryStatistics. So > >>> something like this: > >>> WindowFunction () > >>> > >>> Christophe > >>> > >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske : > > OK, this indicates that the operator following the source is a > bottleneck. > > If that's the WindowOperator, it makes sense to try the refactoring of > the WindowFunction. > Alternatively, you can try to run that operator with a higher > parallelism. > > 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck > : > > > > Hi Fabian, > > > > Thanks for the help, I will try that. The backpressure was on the > > source (HBase). > > > > Christophe > > > > 2016-06-09 16:38 GMT+02:00 Fabian Hueske : > >> > >> Hi Christophe, > >> > >> where does the backpressure appear? In front of the sink operator or > >> before the window operator? > >> > >> In any case, I think you can improve your WindowFunction if you > >> convert parts of it into a FoldFunction . > >> The FoldFunction would take care of the statistics computation and > the > >> WindowFunction would only assemble the result record including > extracting > >> the start time of the window. > >> > >> Then you could do: > >> > >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new > >> YourWindowFunction()); > >> > >> This is more efficient because the FoldFunction is eagerly applied > >> when ever a new element is added to a window. Hence, the window > does only > >> hold a single value (SummaryStatistics) instead of all element > added to the > >> window. In contrast the WindowFunction is called when the window is > finally > >> evaluated. > >> > >> Hope this helps, > >> Fabian > >> > >> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck > >> : > >>> > >>> Hi, > >>> > >>> I am writing a program to read timeseries from HBase and do
Re: HBase reads and back pressure
Thanks! On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyckwrote: > Hi, > I vote on this issue and I agree this would be nice to have. > > Thx! > Christophe > > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek : >> >> Hi, >> I'm afraid this is currently a shortcoming in the API. There is this open >> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We >> can't fix it before Flink 2.0, though, because we have to keep the API >> stable on the Flink 1.x release line. >> >> Cheers, >> Aljoscha >> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck >> wrote: >>> >>> Thanks for the feedback and sorry that I can't try all this straight >>> away. >>> >>> Is there a way to have a different function than: >>> WindowFunction () >>> >>> I would like to return a HBase Put and not a SummaryStatistics. So >>> something like this: >>> WindowFunction () >>> >>> Christophe >>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske : OK, this indicates that the operator following the source is a bottleneck. If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction. Alternatively, you can try to run that operator with a higher parallelism. 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck : > > Hi Fabian, > > Thanks for the help, I will try that. The backpressure was on the > source (HBase). > > Christophe > > 2016-06-09 16:38 GMT+02:00 Fabian Hueske : >> >> Hi Christophe, >> >> where does the backpressure appear? In front of the sink operator or >> before the window operator? >> >> In any case, I think you can improve your WindowFunction if you >> convert parts of it into a FoldFunction . >> The FoldFunction would take care of the statistics computation and the >> WindowFunction would only assemble the result record including extracting >> the start time of the window. >> >> Then you could do: >> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >> YourWindowFunction()); >> >> This is more efficient because the FoldFunction is eagerly applied >> when ever a new element is added to a window. Hence, the window does only >> hold a single value (SummaryStatistics) instead of all element added to >> the >> window. In contrast the WindowFunction is called when the window is >> finally >> evaluated. >> >> Hope this helps, >> Fabian >> >> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck >> : >>> >>> Hi, >>> >>> I am writing a program to read timeseries from HBase and do some >>> daily aggregations (Flink streaming). For now I am just computing some >>> average so not very consuming but my HBase read get slower and slower (I >>> have few billions of points to read). The back pressure is almost all >>> the >>> time close to 1. >>> >>> I use custom timestamp: >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> so I implemented a custom extractor based on: >>> AscendingTimestampExtractor >>> >>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>> read/s then it get worse and worse. Even when I cancel the job, data are >>> still being written in HBase (I did a sink similar to the example - >>> with a >>> cache of 100s of HBase Puts to be a bit more efficient). >>> >>> When I don't put a sink it seems to stay on 1M reads/s. >>> >>> Do you have an idea why ? >>> >>> Here is a bit of code if needed: >>> final WindowedStream ws = hbaseDS.keyBy(0) >>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) >>> .keyBy(0) >>> .timeWindow(Time.days(1)); >>> >>> final SingleOutputStreamOperator puts = ws.apply(new >>> WindowFunction () { >>> >>> @Override >>> public void apply(final Tuple key, final TimeWindow window, final >>> Iterable input, >>> final Collector out) throws Exception { >>> >>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>> for (final ANA ana : input) { >>> summaryStatistics.addValue(ana.getValue()); >>> } >>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>> summaryStatistics); >>> out.collect(put); >>> } >>> }); >>> >>> And how I started Flink on YARN : >>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>> -Dtaskmanager.network.numberOfBuffers=4096 >>> >>> Thanks for
Re: HBase reads and back pressure
Hi, I vote on this issue and I agree this would be nice to have. Thx! Christophe 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek: > Hi, > I'm afraid this is currently a shortcoming in the API. There is this open > Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. > We can't fix it before Flink 2.0, though, because we have to keep the API > stable on the Flink 1.x release line. > > Cheers, > Aljoscha > > On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck < > christophe.salperw...@gmail.com> wrote: > >> Thanks for the feedback and sorry that I can't try all this straight away. >> >> Is there a way to have a different function than: >> WindowFunction () >> >> I would like to return a HBase Put and not a SummaryStatistics. So >> something like this: >> WindowFunction () >> >> Christophe >> >> 2016-06-09 17:47 GMT+02:00 Fabian Hueske : >> >>> OK, this indicates that the operator following the source is a >>> bottleneck. >>> >>> If that's the WindowOperator, it makes sense to try the refactoring of >>> the WindowFunction. >>> Alternatively, you can try to run that operator with a higher >>> parallelism. >>> >>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < >>> christophe.salperw...@gmail.com>: >>> Hi Fabian, Thanks for the help, I will try that. The backpressure was on the source (HBase). Christophe 2016-06-09 16:38 GMT+02:00 Fabian Hueske : > Hi Christophe, > > where does the backpressure appear? In front of the sink operator or > before the window operator? > > In any case, I think you can improve your WindowFunction if you > convert parts of it into a FoldFunction . > The FoldFunction would take care of the statistics computation and the > WindowFunction would only assemble the result record including extracting > the start time of the window. > > Then you could do: > > ws.apply(new SummaryStatistics(), new YourFoldFunction(), new > YourWindowFunction()); > > This is more efficient because the FoldFunction is eagerly applied > when ever a new element is added to a window. Hence, the window does only > hold a single value (SummaryStatistics) instead of all element added to > the > window. In contrast the WindowFunction is called when the window is > finally > evaluated. > > Hope this helps, > Fabian > > 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < > christophe.salperw...@gmail.com>: > >> Hi, >> >> I am writing a program to read timeseries from HBase and do some >> daily aggregations (Flink streaming). For now I am just computing some >> average so not very consuming but my HBase read get slower and slower (I >> have few billions of points to read). The back pressure is almost all the >> time close to 1. >> >> I use custom timestamp: >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> so I implemented a custom extractor based on: >> AscendingTimestampExtractor >> >> At the beginning I have 5M reads/s and after 15 min I have just 1M >> read/s then it get worse and worse. Even when I cancel the job, data are >> still being written in HBase (I did a sink similar to the example - with >> a >> cache of 100s of HBase Puts to be a bit more efficient). >> >> When I don't put a sink it seems to stay on 1M reads/s. >> >> Do you have an idea why ? >> >> Here is a bit of code if needed: >> final WindowedStream ws = hbaseDS.keyBy(0) >> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) >> .keyBy(0) >> .timeWindow(Time.days(1)); >> >> final SingleOutputStreamOperator puts = ws.apply(new >> WindowFunction () { >> >> @Override >> public void apply(final Tuple key, final TimeWindow window, final >> Iterable input, >> final Collector out) throws Exception { >> >> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >> for (final ANA ana : input) { >> summaryStatistics.addValue(ana.getValue()); >> } >> final Put put = buildPut((String) key.getField(0), window.getStart(), >> summaryStatistics); >> out.collect(put); >> } >> }); >> >> And how I started Flink on YARN : >> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >> -Dtaskmanager.network.numberOfBuffers=4096 >> >> Thanks for any feedback! >> >> Christophe >> > > >>> >>
Re: HBase reads and back pressure
Hi Max, In fact the Put would be the output of my WindowFunction. I saw Aljoscha replied, seems I will need to create another intermediate class to handle that. But it is fine. Thx for help! Christophe 2016-06-13 12:25 GMT+02:00 Maximilian Michels: > Hi Christophe, > > A fold function has two inputs: The state and a record to update the > state with. So you can update the SummaryStatistics (state) with each > Put (input). > > Cheers, > Max > > On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck > wrote: > > Thanks for the feedback and sorry that I can't try all this straight > away. > > > > Is there a way to have a different function than: > > WindowFunction () > > > > I would like to return a HBase Put and not a SummaryStatistics. So > something > > like this: > > WindowFunction () > > > > Christophe > > > > 2016-06-09 17:47 GMT+02:00 Fabian Hueske : > >> > >> OK, this indicates that the operator following the source is a > bottleneck. > >> > >> If that's the WindowOperator, it makes sense to try the refactoring of > the > >> WindowFunction. > >> Alternatively, you can try to run that operator with a higher > parallelism. > >> > >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck > >> : > >>> > >>> Hi Fabian, > >>> > >>> Thanks for the help, I will try that. The backpressure was on the > source > >>> (HBase). > >>> > >>> Christophe > >>> > >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske : > > Hi Christophe, > > where does the backpressure appear? In front of the sink operator or > before the window operator? > > In any case, I think you can improve your WindowFunction if you > convert > parts of it into a FoldFunction . > The FoldFunction would take care of the statistics computation and the > WindowFunction would only assemble the result record including > extracting > the start time of the window. > > Then you could do: > > ws.apply(new SummaryStatistics(), new YourFoldFunction(), new > YourWindowFunction()); > > This is more efficient because the FoldFunction is eagerly applied > when > ever a new element is added to a window. Hence, the window does only > hold a > single value (SummaryStatistics) instead of all element added to the > window. > In contrast the WindowFunction is called when the window is finally > evaluated. > > Hope this helps, > Fabian > > 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck > : > > > > Hi, > > > > I am writing a program to read timeseries from HBase and do some > daily > > aggregations (Flink streaming). For now I am just computing some > average so > > not very consuming but my HBase read get slower and slower (I have > few > > billions of points to read). The back pressure is almost all the > time close > > to 1. > > > > I use custom timestamp: > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > so I implemented a custom extractor based on: > > AscendingTimestampExtractor > > > > At the beginning I have 5M reads/s and after 15 min I have just 1M > > read/s then it get worse and worse. Even when I cancel the job, data > are > > still being written in HBase (I did a sink similar to the example - > with a > > cache of 100s of HBase Puts to be a bit more efficient). > > > > When I don't put a sink it seems to stay on 1M reads/s. > > > > Do you have an idea why ? > > > > Here is a bit of code if needed: > > final WindowedStream ws = hbaseDS.keyBy(0) > > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) > > .keyBy(0) > > .timeWindow(Time.days(1)); > > > > final SingleOutputStreamOperator puts = ws.apply(new > > WindowFunction () { > > > > @Override > > public void apply(final Tuple key, final TimeWindow window, final > > Iterable input, > > final Collector out) throws Exception { > > > > final SummaryStatistics summaryStatistics = new SummaryStatistics(); > > for (final ANA ana : input) { > > summaryStatistics.addValue(ana.getValue()); > > } > > final Put put = buildPut((String) key.getField(0), window.getStart(), > > summaryStatistics); > > out.collect(put); > > } > > }); > > > > And how I started Flink on YARN : > > flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 > > -Dtaskmanager.network.numberOfBuffers=4096 > > > > Thanks for any feedback! > > > > Christophe > > > >>> > >> > > >
Re: HBase reads and back pressure
Hi, I'm afraid this is currently a shortcoming in the API. There is this open Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We can't fix it before Flink 2.0, though, because we have to keep the API stable on the Flink 1.x release line. Cheers, Aljoscha On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck < christophe.salperw...@gmail.com> wrote: > Thanks for the feedback and sorry that I can't try all this straight away. > > Is there a way to have a different function than: > WindowFunction() > > I would like to return a HBase Put and not a SummaryStatistics. So > something like this: > WindowFunction () > > Christophe > > 2016-06-09 17:47 GMT+02:00 Fabian Hueske : > >> OK, this indicates that the operator following the source is a >> bottleneck. >> >> If that's the WindowOperator, it makes sense to try the refactoring of >> the WindowFunction. >> Alternatively, you can try to run that operator with a higher parallelism. >> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < >> christophe.salperw...@gmail.com>: >> >>> Hi Fabian, >>> >>> Thanks for the help, I will try that. The backpressure was on the source >>> (HBase). >>> >>> Christophe >>> >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske : >>> Hi Christophe, where does the backpressure appear? In front of the sink operator or before the window operator? In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction . The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window. Then you could do: ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction()); This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated. Hope this helps, Fabian 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < christophe.salperw...@gmail.com>: > Hi, > > I am writing a program to read timeseries from HBase and do some daily > aggregations (Flink streaming). For now I am just computing some average > so > not very consuming but my HBase read get slower and slower (I have few > billions of points to read). The back pressure is almost all the time > close > to 1. > > I use custom timestamp: > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > so I implemented a custom extractor based on: > AscendingTimestampExtractor > > At the beginning I have 5M reads/s and after 15 min I have just 1M > read/s then it get worse and worse. Even when I cancel the job, data are > still being written in HBase (I did a sink similar to the example - with a > cache of 100s of HBase Puts to be a bit more efficient). > > When I don't put a sink it seems to stay on 1M reads/s. > > Do you have an idea why ? > > Here is a bit of code if needed: > final WindowedStream ws = hbaseDS.keyBy(0) > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) > .keyBy(0) > .timeWindow(Time.days(1)); > > final SingleOutputStreamOperator puts = ws.apply(new > WindowFunction () { > > @Override > public void apply(final Tuple key, final TimeWindow window, final > Iterable input, > final Collector out) throws Exception { > > final SummaryStatistics summaryStatistics = new SummaryStatistics(); > for (final ANA ana : input) { > summaryStatistics.addValue(ana.getValue()); > } > final Put put = buildPut((String) key.getField(0), window.getStart(), > summaryStatistics); > out.collect(put); > } > }); > > And how I started Flink on YARN : > flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 > -Dtaskmanager.network.numberOfBuffers=4096 > > Thanks for any feedback! > > Christophe > >>> >> >
Re: HBase reads and back pressure
Hi Christophe, A fold function has two inputs: The state and a record to update the state with. So you can update the SummaryStatistics (state) with each Put (input). Cheers, Max On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyckwrote: > Thanks for the feedback and sorry that I can't try all this straight away. > > Is there a way to have a different function than: > WindowFunction () > > I would like to return a HBase Put and not a SummaryStatistics. So something > like this: > WindowFunction () > > Christophe > > 2016-06-09 17:47 GMT+02:00 Fabian Hueske : >> >> OK, this indicates that the operator following the source is a bottleneck. >> >> If that's the WindowOperator, it makes sense to try the refactoring of the >> WindowFunction. >> Alternatively, you can try to run that operator with a higher parallelism. >> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >> : >>> >>> Hi Fabian, >>> >>> Thanks for the help, I will try that. The backpressure was on the source >>> (HBase). >>> >>> Christophe >>> >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske : Hi Christophe, where does the backpressure appear? In front of the sink operator or before the window operator? In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction . The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window. Then you could do: ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction()); This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated. Hope this helps, Fabian 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck : > > Hi, > > I am writing a program to read timeseries from HBase and do some daily > aggregations (Flink streaming). For now I am just computing some average > so > not very consuming but my HBase read get slower and slower (I have few > billions of points to read). The back pressure is almost all the time > close > to 1. > > I use custom timestamp: > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > so I implemented a custom extractor based on: > AscendingTimestampExtractor > > At the beginning I have 5M reads/s and after 15 min I have just 1M > read/s then it get worse and worse. Even when I cancel the job, data are > still being written in HBase (I did a sink similar to the example - with a > cache of 100s of HBase Puts to be a bit more efficient). > > When I don't put a sink it seems to stay on 1M reads/s. > > Do you have an idea why ? > > Here is a bit of code if needed: > final WindowedStream ws = hbaseDS.keyBy(0) > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) > .keyBy(0) > .timeWindow(Time.days(1)); > > final SingleOutputStreamOperator puts = ws.apply(new > WindowFunction () { > > @Override > public void apply(final Tuple key, final TimeWindow window, final > Iterable input, > final Collector out) throws Exception { > > final SummaryStatistics summaryStatistics = new SummaryStatistics(); > for (final ANA ana : input) { > summaryStatistics.addValue(ana.getValue()); > } > final Put put = buildPut((String) key.getField(0), window.getStart(), > summaryStatistics); > out.collect(put); > } > }); > > And how I started Flink on YARN : > flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 > -Dtaskmanager.network.numberOfBuffers=4096 > > Thanks for any feedback! > > Christophe >>> >> >
Re: HBase reads and back pressure
Thanks for the feedback and sorry that I can't try all this straight away. Is there a way to have a different function than: WindowFunction() I would like to return a HBase Put and not a SummaryStatistics. So something like this: WindowFunction () Christophe 2016-06-09 17:47 GMT+02:00 Fabian Hueske : > OK, this indicates that the operator following the source is a bottleneck. > > If that's the WindowOperator, it makes sense to try the refactoring of the > WindowFunction. > Alternatively, you can try to run that operator with a higher parallelism. > > 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < > christophe.salperw...@gmail.com>: > >> Hi Fabian, >> >> Thanks for the help, I will try that. The backpressure was on the source >> (HBase). >> >> Christophe >> >> 2016-06-09 16:38 GMT+02:00 Fabian Hueske : >> >>> Hi Christophe, >>> >>> where does the backpressure appear? In front of the sink operator or >>> before the window operator? >>> >>> In any case, I think you can improve your WindowFunction if you convert >>> parts of it into a FoldFunction . >>> The FoldFunction would take care of the statistics computation and the >>> WindowFunction would only assemble the result record including extracting >>> the start time of the window. >>> >>> Then you could do: >>> >>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>> YourWindowFunction()); >>> >>> This is more efficient because the FoldFunction is eagerly applied when >>> ever a new element is added to a window. Hence, the window does only hold a >>> single value (SummaryStatistics) instead of all element added to the >>> window. In contrast the WindowFunction is called when the window is finally >>> evaluated. >>> >>> Hope this helps, >>> Fabian >>> >>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < >>> christophe.salperw...@gmail.com>: >>> Hi, I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close to 1. I use custom timestamp: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); so I implemented a custom extractor based on: AscendingTimestampExtractor At the beginning I have 5M reads/s and after 15 min I have just 1M read/s then it get worse and worse. Even when I cancel the job, data are still being written in HBase (I did a sink similar to the example - with a cache of 100s of HBase Puts to be a bit more efficient). When I don't put a sink it seems to stay on 1M reads/s. Do you have an idea why ? Here is a bit of code if needed: final WindowedStream ws = hbaseDS.keyBy(0) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) .keyBy(0) .timeWindow(Time.days(1)); final SingleOutputStreamOperator puts = ws.apply(new WindowFunction () { @Override public void apply(final Tuple key, final TimeWindow window, final Iterable input, final Collector out) throws Exception { final SummaryStatistics summaryStatistics = new SummaryStatistics(); for (final ANA ana : input) { summaryStatistics.addValue(ana.getValue()); } final Put put = buildPut((String) key.getField(0), window.getStart(), summaryStatistics); out.collect(put); } }); And how I started Flink on YARN : flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 -Dtaskmanager.network.numberOfBuffers=4096 Thanks for any feedback! Christophe >>> >>> >> >
Re: HBase reads and back pressure
OK, this indicates that the operator following the source is a bottleneck. If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction. Alternatively, you can try to run that operator with a higher parallelism. 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < christophe.salperw...@gmail.com>: > Hi Fabian, > > Thanks for the help, I will try that. The backpressure was on the source > (HBase). > > Christophe > > 2016-06-09 16:38 GMT+02:00 Fabian Hueske: > >> Hi Christophe, >> >> where does the backpressure appear? In front of the sink operator or >> before the window operator? >> >> In any case, I think you can improve your WindowFunction if you convert >> parts of it into a FoldFunction . >> The FoldFunction would take care of the statistics computation and the >> WindowFunction would only assemble the result record including extracting >> the start time of the window. >> >> Then you could do: >> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >> YourWindowFunction()); >> >> This is more efficient because the FoldFunction is eagerly applied when >> ever a new element is added to a window. Hence, the window does only hold a >> single value (SummaryStatistics) instead of all element added to the >> window. In contrast the WindowFunction is called when the window is finally >> evaluated. >> >> Hope this helps, >> Fabian >> >> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < >> christophe.salperw...@gmail.com>: >> >>> Hi, >>> >>> I am writing a program to read timeseries from HBase and do some daily >>> aggregations (Flink streaming). For now I am just computing some average so >>> not very consuming but my HBase read get slower and slower (I have few >>> billions of points to read). The back pressure is almost all the time close >>> to 1. >>> >>> I use custom timestamp: >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> so I implemented a custom extractor based on: >>> AscendingTimestampExtractor >>> >>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>> read/s then it get worse and worse. Even when I cancel the job, data are >>> still being written in HBase (I did a sink similar to the example - with a >>> cache of 100s of HBase Puts to be a bit more efficient). >>> >>> When I don't put a sink it seems to stay on 1M reads/s. >>> >>> Do you have an idea why ? >>> >>> Here is a bit of code if needed: >>> final WindowedStream ws = hbaseDS.keyBy(0) >>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) >>> .keyBy(0) >>> .timeWindow(Time.days(1)); >>> >>> final SingleOutputStreamOperator puts = ws.apply(new >>> WindowFunction () { >>> >>> @Override >>> public void apply(final Tuple key, final TimeWindow window, final >>> Iterable input, >>> final Collector out) throws Exception { >>> >>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>> for (final ANA ana : input) { >>> summaryStatistics.addValue(ana.getValue()); >>> } >>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>> summaryStatistics); >>> out.collect(put); >>> } >>> }); >>> >>> And how I started Flink on YARN : >>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>> -Dtaskmanager.network.numberOfBuffers=4096 >>> >>> Thanks for any feedback! >>> >>> Christophe >>> >> >> >
Re: HBase reads and back pressure
Hi Fabian, Thanks for the help, I will try that. The backpressure was on the source (HBase). Christophe 2016-06-09 16:38 GMT+02:00 Fabian Hueske: > Hi Christophe, > > where does the backpressure appear? In front of the sink operator or > before the window operator? > > In any case, I think you can improve your WindowFunction if you convert > parts of it into a FoldFunction . > The FoldFunction would take care of the statistics computation and the > WindowFunction would only assemble the result record including extracting > the start time of the window. > > Then you could do: > > ws.apply(new SummaryStatistics(), new YourFoldFunction(), new > YourWindowFunction()); > > This is more efficient because the FoldFunction is eagerly applied when > ever a new element is added to a window. Hence, the window does only hold a > single value (SummaryStatistics) instead of all element added to the > window. In contrast the WindowFunction is called when the window is finally > evaluated. > > Hope this helps, > Fabian > > 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < > christophe.salperw...@gmail.com>: > >> Hi, >> >> I am writing a program to read timeseries from HBase and do some daily >> aggregations (Flink streaming). For now I am just computing some average so >> not very consuming but my HBase read get slower and slower (I have few >> billions of points to read). The back pressure is almost all the time close >> to 1. >> >> I use custom timestamp: >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> so I implemented a custom extractor based on: >> AscendingTimestampExtractor >> >> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s >> then it get worse and worse. Even when I cancel the job, data are still >> being written in HBase (I did a sink similar to the example - with a cache >> of 100s of HBase Puts to be a bit more efficient). >> >> When I don't put a sink it seems to stay on 1M reads/s. >> >> Do you have an idea why ? >> >> Here is a bit of code if needed: >> final WindowedStream ws = hbaseDS.keyBy(0) >> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) >> .keyBy(0) >> .timeWindow(Time.days(1)); >> >> final SingleOutputStreamOperator puts = ws.apply(new >> WindowFunction () { >> >> @Override >> public void apply(final Tuple key, final TimeWindow window, final >> Iterable input, >> final Collector out) throws Exception { >> >> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >> for (final ANA ana : input) { >> summaryStatistics.addValue(ana.getValue()); >> } >> final Put put = buildPut((String) key.getField(0), window.getStart(), >> summaryStatistics); >> out.collect(put); >> } >> }); >> >> And how I started Flink on YARN : >> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >> -Dtaskmanager.network.numberOfBuffers=4096 >> >> Thanks for any feedback! >> >> Christophe >> > >
Re: HBase reads and back pressure
Hi Christophe, where does the backpressure appear? In front of the sink operator or before the window operator? In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction. The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window. Then you could do: ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction()); This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated. Hope this helps, Fabian 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < christophe.salperw...@gmail.com>: > Hi, > > I am writing a program to read timeseries from HBase and do some daily > aggregations (Flink streaming). For now I am just computing some average so > not very consuming but my HBase read get slower and slower (I have few > billions of points to read). The back pressure is almost all the time close > to 1. > > I use custom timestamp: > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > so I implemented a custom extractor based on: > AscendingTimestampExtractor > > At the beginning I have 5M reads/s and after 15 min I have just 1M read/s > then it get worse and worse. Even when I cancel the job, data are still > being written in HBase (I did a sink similar to the example - with a cache > of 100s of HBase Puts to be a bit more efficient). > > When I don't put a sink it seems to stay on 1M reads/s. > > Do you have an idea why ? > > Here is a bit of code if needed: > final WindowedStream ws = hbaseDS.keyBy(0) > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()) > .keyBy(0) > .timeWindow(Time.days(1)); > > final SingleOutputStreamOperator puts = ws.apply(new > WindowFunction () { > > @Override > public void apply(final Tuple key, final TimeWindow window, final > Iterable input, > final Collector out) throws Exception { > > final SummaryStatistics summaryStatistics = new SummaryStatistics(); > for (final ANA ana : input) { > summaryStatistics.addValue(ana.getValue()); > } > final Put put = buildPut((String) key.getField(0), window.getStart(), > summaryStatistics); > out.collect(put); > } > }); > > And how I started Flink on YARN : > flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 > -Dtaskmanager.network.numberOfBuffers=4096 > > Thanks for any feedback! > > Christophe >