I would need to restart it to be sure (and when it starts to be stuck, the
web interface doesn't give the backpressure anymore), but it seems so. I
put a text file as the output and it took 5h to complete:
aggregates.writeAsText("hdfs:///user/christophe/flinkHBase");

What is weird is that I have as many lines as in my input if I don't limit
the scan (14B rows). If I limit the scan (150M) I have 1 line per hour as
expected. I am investigating a bit more right now.

Thanks again!
Christophe

2016-06-13 18:50 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Do the backpressure metrics indicate that the sink function is blocking?
>
> 2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> To continue, I implemented the ws.apply(new SummaryStatistics(), new
>> YourFoldFunction(), new YourWindowFunction());
>>
>> It works fine when there is no sink, but when I put an HBase sink it
>> seems that the sink, somehow, blocks the flow. The sink writes very little
>> data into HBase and when I limit my input to just few sensors, it works
>> well. Any idea?
>>
>> final SingleOutputStreamOperator<Aggregate> aggregates = ws
>> .apply(
>> new Aggregate(),
>> new FoldFunction<ANA, Aggregate>() {
>>
>> @Override
>> public Aggregate fold(final Aggregate accumulator, final ANA value)
>> throws Exception {
>> accumulator.addValue(value.getValue());
>> return accumulator;
>> }
>> },
>> new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() {
>>
>> @Override
>> public void apply(final Tuple key, final TimeWindow window, final
>> Iterable<Aggregate> input,
>> final Collector<Aggregate> out) throws Exception {
>> for (final Aggregate aggregate : input) {
>> aggregate.setM((String) key.getField(0));
>> aggregate.setTime(window.getStart());
>> out.collect(aggregate);
>> }
>> }
>> });
>> aggregates.
>> setParallelism(10).
>> writeUsingOutputFormat(new OutputFormat<Aggregate>() {
>> private static final long serialVersionUID = 1L;
>> HBaseConnect hBaseConnect;
>> Table table;
>> final int flushSize = 100;
>> List<Put> puts = new ArrayList<>();
>> @Override
>> public void writeRecord(final Aggregate record) throws IOException {
>> puts.add(record.buildPut());
>> if (puts.size() == flushSize) {
>> table.put(puts);
>> }
>> }
>> @Override
>> public void open(final int taskNumber, final int numTasks) throws
>> IOException {
>> hBaseConnect = new HBaseConnect();
>> table = hBaseConnect.getHTable("PERF_TEST");
>> }
>> @Override
>> public void configure(final Configuration parameters) {
>> // TODO Auto-generated method stub
>> }
>> @Override
>> public void close() throws IOException {
>> //last inserts
>> table.put(puts);
>> table.close();
>> hBaseConnect.close();
>> }
>> });
>>
>> 2016-06-13 13:47 GMT+02:00 Maximilian Michels <m...@apache.org>:
>>
>>> Thanks!
>>>
>>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>>> <christophe.salperw...@gmail.com> wrote:
>>> > Hi,
>>> > I vote on this issue and I agree this would be nice to have.
>>> >
>>> > Thx!
>>> > Christophe
>>> >
>>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>> >>
>>> >> Hi,
>>> >> I'm afraid this is currently a shortcoming in the API. There is this
>>> open
>>> >> Jira issue to track it:
>>> https://issues.apache.org/jira/browse/FLINK-3869. We
>>> >> can't fix it before Flink 2.0, though, because we have to keep the API
>>> >> stable on the Flink 1.x release line.
>>> >>
>>> >> Cheers,
>>> >> Aljoscha
>>> >>
>>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>>> >> <christophe.salperw...@gmail.com> wrote:
>>> >>>
>>> >>> Thanks for the feedback and sorry that I can't try all this straight
>>> >>> away.
>>> >>>
>>> >>> Is there a way to have a different function than:
>>> >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple,
>>> TimeWindow>()
>>> >>>
>>> >>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> >>> something like this:
>>> >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>>> >>>
>>> >>> Christophe
>>> >>>
>>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>> >>>>
>>> >>>> OK, this indicates that the operator following the source is a
>>> >>>> bottleneck.
>>> >>>>
>>> >>>> If that's the WindowOperator, it makes sense to try the refactoring
>>> of
>>> >>>> the WindowFunction.
>>> >>>> Alternatively, you can try to run that operator with a higher
>>> >>>> parallelism.
>>> >>>>
>>> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>> >>>> <christophe.salperw...@gmail.com>:
>>> >>>>>
>>> >>>>> Hi Fabian,
>>> >>>>>
>>> >>>>> Thanks for the help, I will try that. The backpressure was on the
>>> >>>>> source (HBase).
>>> >>>>>
>>> >>>>> Christophe
>>> >>>>>
>>> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>> >>>>>>
>>> >>>>>> Hi Christophe,
>>> >>>>>>
>>> >>>>>> where does the backpressure appear? In front of the sink operator
>>> or
>>> >>>>>> before the window operator?
>>> >>>>>>
>>> >>>>>> In any case, I think you can improve your WindowFunction if you
>>> >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>> >>>>>> The FoldFunction would take care of the statistics computation
>>> and the
>>> >>>>>> WindowFunction would only assemble the result record including
>>> extracting
>>> >>>>>> the start time of the window.
>>> >>>>>>
>>> >>>>>> Then you could do:
>>> >>>>>>
>>> >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>> >>>>>> YourWindowFunction());
>>> >>>>>>
>>> >>>>>> This is more efficient because the FoldFunction is eagerly applied
>>> >>>>>> when ever a new element is added to a window. Hence, the window
>>> does only
>>> >>>>>> hold a single value (SummaryStatistics) instead of all element
>>> added to the
>>> >>>>>> window. In contrast the WindowFunction is called when the window
>>> is finally
>>> >>>>>> evaluated.
>>> >>>>>>
>>> >>>>>> Hope this helps,
>>> >>>>>> Fabian
>>> >>>>>>
>>> >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>> >>>>>> <christophe.salperw...@gmail.com>:
>>> >>>>>>>
>>> >>>>>>> Hi,
>>> >>>>>>>
>>> >>>>>>> I am writing a program to read timeseries from HBase and do some
>>> >>>>>>> daily aggregations (Flink streaming). For now I am just
>>> computing some
>>> >>>>>>> average so not very consuming but my HBase read get slower and
>>> slower (I
>>> >>>>>>> have few billions of points to read). The back pressure is
>>> almost all the
>>> >>>>>>> time close to 1.
>>> >>>>>>>
>>> >>>>>>> I use custom timestamp:
>>> >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> >>>>>>>
>>> >>>>>>> so I implemented a custom extractor based on:
>>> >>>>>>> AscendingTimestampExtractor
>>> >>>>>>>
>>> >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just
>>> 1M
>>> >>>>>>> read/s then it get worse and worse. Even when I cancel the job,
>>> data are
>>> >>>>>>> still being written in HBase (I did a sink similar to the
>>> example - with a
>>> >>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>> >>>>>>>
>>> >>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>> >>>>>>>
>>> >>>>>>> Do you have an idea why ?
>>> >>>>>>>
>>> >>>>>>> Here is a bit of code if needed:
>>> >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws =
>>> hbaseDS.keyBy(0)
>>> >>>>>>> .assignTimestampsAndWatermarks(new
>>> xxxxAscendingTimestampExtractor())
>>> >>>>>>> .keyBy(0)
>>> >>>>>>> .timeWindow(Time.days(1));
>>> >>>>>>>
>>> >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>> >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>> >>>>>>>
>>> >>>>>>> @Override
>>> >>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>> >>>>>>> Iterable<ANA> input,
>>> >>>>>>> final Collector<Put> out) throws Exception {
>>> >>>>>>>
>>> >>>>>>> final SummaryStatistics summaryStatistics = new
>>> SummaryStatistics();
>>> >>>>>>> for (final ANA ana : input) {
>>> >>>>>>> summaryStatistics.addValue(ana.getValue());
>>> >>>>>>> }
>>> >>>>>>> final Put put = buildPut((String) key.getField(0),
>>> window.getStart(),
>>> >>>>>>> summaryStatistics);
>>> >>>>>>> out.collect(put);
>>> >>>>>>> }
>>> >>>>>>> });
>>> >>>>>>>
>>> >>>>>>> And how I started Flink on YARN :
>>> >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>> >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>> >>>>>>>
>>> >>>>>>> Thanks for any feedback!
>>> >>>>>>>
>>> >>>>>>> Christophe
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >
>>>
>>
>>
>

Reply via email to