Do the backpressure metrics indicate that the sink function is blocking?

2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> To continue, I implemented the ws.apply(new SummaryStatistics(), new
> YourFoldFunction(), new YourWindowFunction());
>
> It works fine when there is no sink, but when I put an HBase sink it seems
> that the sink, somehow, blocks the flow. The sink writes very little data
> into HBase and when I limit my input to just few sensors, it works well. Any
> idea?
>
> final SingleOutputStreamOperator<Aggregate> aggregates = ws
> .apply(
> new Aggregate(),
> new FoldFunction<ANA, Aggregate>() {
>
> @Override
> public Aggregate fold(final Aggregate accumulator, final ANA value) throws
> Exception {
> accumulator.addValue(value.getValue());
> return accumulator;
> }
> },
> new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable<Aggregate> input,
> final Collector<Aggregate> out) throws Exception {
> for (final Aggregate aggregate : input) {
> aggregate.setM((String) key.getField(0));
> aggregate.setTime(window.getStart());
> out.collect(aggregate);
> }
> }
> });
> aggregates.
> setParallelism(10).
> writeUsingOutputFormat(new OutputFormat<Aggregate>() {
> private static final long serialVersionUID = 1L;
> HBaseConnect hBaseConnect;
> Table table;
> final int flushSize = 100;
> List<Put> puts = new ArrayList<>();
> @Override
> public void writeRecord(final Aggregate record) throws IOException {
> puts.add(record.buildPut());
> if (puts.size() == flushSize) {
> table.put(puts);
> }
> }
> @Override
> public void open(final int taskNumber, final int numTasks) throws
> IOException {
> hBaseConnect = new HBaseConnect();
> table = hBaseConnect.getHTable("PERF_TEST");
> }
> @Override
> public void configure(final Configuration parameters) {
> // TODO Auto-generated method stub
> }
> @Override
> public void close() throws IOException {
> //last inserts
> table.put(puts);
> table.close();
> hBaseConnect.close();
> }
> });
>
> 2016-06-13 13:47 GMT+02:00 Maximilian Michels <m...@apache.org>:
>
>> Thanks!
>>
>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>> <christophe.salperw...@gmail.com> wrote:
>> > Hi,
>> > I vote on this issue and I agree this would be nice to have.
>> >
>> > Thx!
>> > Christophe
>> >
>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>> >>
>> >> Hi,
>> >> I'm afraid this is currently a shortcoming in the API. There is this
>> open
>> >> Jira issue to track it:
>> https://issues.apache.org/jira/browse/FLINK-3869. We
>> >> can't fix it before Flink 2.0, though, because we have to keep the API
>> >> stable on the Flink 1.x release line.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> >> <christophe.salperw...@gmail.com> wrote:
>> >>>
>> >>> Thanks for the feedback and sorry that I can't try all this straight
>> >>> away.
>> >>>
>> >>> Is there a way to have a different function than:
>> >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple,
>> TimeWindow>()
>> >>>
>> >>> I would like to return a HBase Put and not a SummaryStatistics. So
>> >>> something like this:
>> >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>> >>>
>> >>> Christophe
>> >>>
>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>> >>>>
>> >>>> OK, this indicates that the operator following the source is a
>> >>>> bottleneck.
>> >>>>
>> >>>> If that's the WindowOperator, it makes sense to try the refactoring
>> of
>> >>>> the WindowFunction.
>> >>>> Alternatively, you can try to run that operator with a higher
>> >>>> parallelism.
>> >>>>
>> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>> >>>> <christophe.salperw...@gmail.com>:
>> >>>>>
>> >>>>> Hi Fabian,
>> >>>>>
>> >>>>> Thanks for the help, I will try that. The backpressure was on the
>> >>>>> source (HBase).
>> >>>>>
>> >>>>> Christophe
>> >>>>>
>> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>> >>>>>>
>> >>>>>> Hi Christophe,
>> >>>>>>
>> >>>>>> where does the backpressure appear? In front of the sink operator
>> or
>> >>>>>> before the window operator?
>> >>>>>>
>> >>>>>> In any case, I think you can improve your WindowFunction if you
>> >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>> >>>>>> The FoldFunction would take care of the statistics computation and
>> the
>> >>>>>> WindowFunction would only assemble the result record including
>> extracting
>> >>>>>> the start time of the window.
>> >>>>>>
>> >>>>>> Then you could do:
>> >>>>>>
>> >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> >>>>>> YourWindowFunction());
>> >>>>>>
>> >>>>>> This is more efficient because the FoldFunction is eagerly applied
>> >>>>>> when ever a new element is added to a window. Hence, the window
>> does only
>> >>>>>> hold a single value (SummaryStatistics) instead of all element
>> added to the
>> >>>>>> window. In contrast the WindowFunction is called when the window
>> is finally
>> >>>>>> evaluated.
>> >>>>>>
>> >>>>>> Hope this helps,
>> >>>>>> Fabian
>> >>>>>>
>> >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>> >>>>>> <christophe.salperw...@gmail.com>:
>> >>>>>>>
>> >>>>>>> Hi,
>> >>>>>>>
>> >>>>>>> I am writing a program to read timeseries from HBase and do some
>> >>>>>>> daily aggregations (Flink streaming). For now I am just computing
>> some
>> >>>>>>> average so not very consuming but my HBase read get slower and
>> slower (I
>> >>>>>>> have few billions of points to read). The back pressure is almost
>> all the
>> >>>>>>> time close to 1.
>> >>>>>>>
>> >>>>>>> I use custom timestamp:
>> >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>>>>>>
>> >>>>>>> so I implemented a custom extractor based on:
>> >>>>>>> AscendingTimestampExtractor
>> >>>>>>>
>> >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>> >>>>>>> read/s then it get worse and worse. Even when I cancel the job,
>> data are
>> >>>>>>> still being written in HBase (I did a sink similar to the example
>> - with a
>> >>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>> >>>>>>>
>> >>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>> >>>>>>>
>> >>>>>>> Do you have an idea why ?
>> >>>>>>>
>> >>>>>>> Here is a bit of code if needed:
>> >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>> >>>>>>> .assignTimestampsAndWatermarks(new
>> xxxxAscendingTimestampExtractor())
>> >>>>>>> .keyBy(0)
>> >>>>>>> .timeWindow(Time.days(1));
>> >>>>>>>
>> >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>> >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>> >>>>>>>
>> >>>>>>> @Override
>> >>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>> >>>>>>> Iterable<ANA> input,
>> >>>>>>> final Collector<Put> out) throws Exception {
>> >>>>>>>
>> >>>>>>> final SummaryStatistics summaryStatistics = new
>> SummaryStatistics();
>> >>>>>>> for (final ANA ana : input) {
>> >>>>>>> summaryStatistics.addValue(ana.getValue());
>> >>>>>>> }
>> >>>>>>> final Put put = buildPut((String) key.getField(0),
>> window.getStart(),
>> >>>>>>> summaryStatistics);
>> >>>>>>> out.collect(put);
>> >>>>>>> }
>> >>>>>>> });
>> >>>>>>>
>> >>>>>>> And how I started Flink on YARN :
>> >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>> >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>> >>>>>>>
>> >>>>>>> Thanks for any feedback!
>> >>>>>>>
>> >>>>>>> Christophe
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >
>>
>
>

Reply via email to