To continue, I implemented the ws.apply(new SummaryStatistics(), new
YourFoldFunction(), new YourWindowFunction());

It works fine when there is no sink, but when I put an HBase sink it seems
that the sink, somehow, blocks the flow. The sink writes very little data
into HBase and when I limit my input to just few sensors, it works well. Any
idea?

final SingleOutputStreamOperator<Aggregate> aggregates = ws
.apply(
new Aggregate(),
new FoldFunction<ANA, Aggregate>() {

@Override
public Aggregate fold(final Aggregate accumulator, final ANA value) throws
Exception {
accumulator.addValue(value.getValue());
return accumulator;
}
},
new WindowFunction<Aggregate, Aggregate, Tuple, TimeWindow>() {

@Override
public void apply(final Tuple key, final TimeWindow window, final
Iterable<Aggregate> input,
final Collector<Aggregate> out) throws Exception {
for (final Aggregate aggregate : input) {
aggregate.setM((String) key.getField(0));
aggregate.setTime(window.getStart());
out.collect(aggregate);
}
}
});
aggregates.
setParallelism(10).
writeUsingOutputFormat(new OutputFormat<Aggregate>() {
private static final long serialVersionUID = 1L;
HBaseConnect hBaseConnect;
Table table;
final int flushSize = 100;
List<Put> puts = new ArrayList<>();
@Override
public void writeRecord(final Aggregate record) throws IOException {
puts.add(record.buildPut());
if (puts.size() == flushSize) {
table.put(puts);
}
}
@Override
public void open(final int taskNumber, final int numTasks) throws
IOException {
hBaseConnect = new HBaseConnect();
table = hBaseConnect.getHTable("PERF_TEST");
}
@Override
public void configure(final Configuration parameters) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
//last inserts
table.put(puts);
table.close();
hBaseConnect.close();
}
});

2016-06-13 13:47 GMT+02:00 Maximilian Michels <m...@apache.org>:

> Thanks!
>
> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
> <christophe.salperw...@gmail.com> wrote:
> > Hi,
> > I vote on this issue and I agree this would be nice to have.
> >
> > Thx!
> > Christophe
> >
> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
> >>
> >> Hi,
> >> I'm afraid this is currently a shortcoming in the API. There is this
> open
> >> Jira issue to track it:
> https://issues.apache.org/jira/browse/FLINK-3869. We
> >> can't fix it before Flink 2.0, though, because we have to keep the API
> >> stable on the Flink 1.x release line.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
> >> <christophe.salperw...@gmail.com> wrote:
> >>>
> >>> Thanks for the feedback and sorry that I can't try all this straight
> >>> away.
> >>>
> >>> Is there a way to have a different function than:
> >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple,
> TimeWindow>()
> >>>
> >>> I would like to return a HBase Put and not a SummaryStatistics. So
> >>> something like this:
> >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
> >>>
> >>> Christophe
> >>>
> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> >>>>
> >>>> OK, this indicates that the operator following the source is a
> >>>> bottleneck.
> >>>>
> >>>> If that's the WindowOperator, it makes sense to try the refactoring of
> >>>> the WindowFunction.
> >>>> Alternatively, you can try to run that operator with a higher
> >>>> parallelism.
> >>>>
> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
> >>>> <christophe.salperw...@gmail.com>:
> >>>>>
> >>>>> Hi Fabian,
> >>>>>
> >>>>> Thanks for the help, I will try that. The backpressure was on the
> >>>>> source (HBase).
> >>>>>
> >>>>> Christophe
> >>>>>
> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> >>>>>>
> >>>>>> Hi Christophe,
> >>>>>>
> >>>>>> where does the backpressure appear? In front of the sink operator or
> >>>>>> before the window operator?
> >>>>>>
> >>>>>> In any case, I think you can improve your WindowFunction if you
> >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
> >>>>>> The FoldFunction would take care of the statistics computation and
> the
> >>>>>> WindowFunction would only assemble the result record including
> extracting
> >>>>>> the start time of the window.
> >>>>>>
> >>>>>> Then you could do:
> >>>>>>
> >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> >>>>>> YourWindowFunction());
> >>>>>>
> >>>>>> This is more efficient because the FoldFunction is eagerly applied
> >>>>>> when ever a new element is added to a window. Hence, the window
> does only
> >>>>>> hold a single value (SummaryStatistics) instead of all element
> added to the
> >>>>>> window. In contrast the WindowFunction is called when the window is
> finally
> >>>>>> evaluated.
> >>>>>>
> >>>>>> Hope this helps,
> >>>>>> Fabian
> >>>>>>
> >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
> >>>>>> <christophe.salperw...@gmail.com>:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I am writing a program to read timeseries from HBase and do some
> >>>>>>> daily aggregations (Flink streaming). For now I am just computing
> some
> >>>>>>> average so not very consuming but my HBase read get slower and
> slower (I
> >>>>>>> have few billions of points to read). The back pressure is almost
> all the
> >>>>>>> time close to 1.
> >>>>>>>
> >>>>>>> I use custom timestamp:
> >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>>>>>>
> >>>>>>> so I implemented a custom extractor based on:
> >>>>>>> AscendingTimestampExtractor
> >>>>>>>
> >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
> >>>>>>> read/s then it get worse and worse. Even when I cancel the job,
> data are
> >>>>>>> still being written in HBase (I did a sink similar to the example
> - with a
> >>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
> >>>>>>>
> >>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
> >>>>>>>
> >>>>>>> Do you have an idea why ?
> >>>>>>>
> >>>>>>> Here is a bit of code if needed:
> >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
> >>>>>>> .assignTimestampsAndWatermarks(new
> xxxxAscendingTimestampExtractor())
> >>>>>>> .keyBy(0)
> >>>>>>> .timeWindow(Time.days(1));
> >>>>>>>
> >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
> >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public void apply(final Tuple key, final TimeWindow window, final
> >>>>>>> Iterable<ANA> input,
> >>>>>>> final Collector<Put> out) throws Exception {
> >>>>>>>
> >>>>>>> final SummaryStatistics summaryStatistics = new
> SummaryStatistics();
> >>>>>>> for (final ANA ana : input) {
> >>>>>>> summaryStatistics.addValue(ana.getValue());
> >>>>>>> }
> >>>>>>> final Put put = buildPut((String) key.getField(0),
> window.getStart(),
> >>>>>>> summaryStatistics);
> >>>>>>> out.collect(put);
> >>>>>>> }
> >>>>>>> });
> >>>>>>>
> >>>>>>> And how I started Flink on YARN :
> >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
> >>>>>>>
> >>>>>>> Thanks for any feedback!
> >>>>>>>
> >>>>>>> Christophe
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>

Reply via email to